Using Azure Blob Storage and Parquet

Objective

This notebook shows how to interact with Parquet on Azure Blob Storage.

Please note that it is not possible to write Parquet to Blob Storage using PySpark. I have tried with version 2.2, 2.3 and 2.4 but none of them work (yet). It connects and creates the folder, but no data is written. Azure support was not able to help me, except for advising me to use HDinsights.

Imports

In [1]:
from azure.storage.blob import BlockBlobService
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
from configparser import RawConfigParser
from pyspark import SparkConf, SparkContext, SQLContext

Definitions

In [2]:
BLOB_NAME = "characters.parquet"

Setup Blob

In [3]:
# Read the configuration
config = RawConfigParser()
config.read("blobconfig.ini")
# Create blob_service
blob_service = BlockBlobService(
    account_name=config["blob-store"]["blob_account_name"],
    account_key=config["blob-store"]["blob_account_key"],
)

Setup Spark

In order to connect to Azure Blob Storage with Spark, we need to download two JARS (hadoop-azure-2.7.3.jar and azure-storage-6.1.0.jar) and add them to the Spark configuration. I chose these specific versions since they were the only ones working with reading data using Spark 2.4.0. Additionally, the fs.azure needs to be set to the Azure FileSytem and fs.azure.account.key.<youraccountname>.blob.core.windows.net should contain the account key.

In [4]:
def setup_spark(config):
    """ Setup Spark to connect to Azure Blob Storage """
    jars = [
        "spark-2.4.0-bin-hadoop2.7/jars/hadoop-azure-2.7.3.jar",
        "spark-2.4.0-bin-hadoop2.7/jars/azure-storage-6.1.0.jar",
    ]
    conf = (
        SparkConf()
        .setAppName("Spark Blob Test")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        .set(
            f"fs.azure.account.key.{config['blob-store']['blob_account_name']}.blob.core.windows.net",
            config["blob-store"]["blob_account_key"],
        )
    )
    sc = SparkContext(conf=conf).getOrCreate()

    return SQLContext(sc)
In [5]:
# Create Spark context
sql_context = setup_spark(config)
In [6]:
# Create dataframe
df = pd.DataFrame.from_dict(
    [("Mario", "Red"), ("Luigi", "Green"), ("Princess", "Pink")]
).rename(columns={0: "name", 1: "color"})
print(df.head())
       name  color
0     Mario    Red
1     Luigi  Green
2  Princess   Pink

Using PyArrow with Pandas it is easy to write a dataframe to Blob Storage. Convert the Pandas dataframe into Parquet using a buffer and write the buffer to a blob.

In [7]:
def write_pandas_dataframe_to_blob(blob_service, df, container_name, blob_name):
    """ Write Pandas dataframe to blob storage """
    buffer = BytesIO()
    df.to_parquet(buffer)
    blob_service.create_blob_from_bytes(
        container_name=container_name, blob_name=blob_name, blob=buffer.getvalue()
    )
In [8]:
# Write to blob using pyarrow
write_pandas_dataframe_to_blob(blob_service, df, config['blob-store']['blob_container'], BLOB_NAME)

Reading data back from blob using Pandas is identical. The data is read into a ByteStream from the blob storage.

In [9]:
def get_pandas_dataframe_from_parquet_on_blob(blob_service, container_name, blob_name):
    """ Get a dataframe from Parquet file on blob storage """
    byte_stream = BytesIO()
    try:
        blob_service.get_blob_to_stream(
            container_name=container_name, blob_name=blob_name, stream=byte_stream
        )
        df = pq.read_table(source=byte_stream).to_pandas()
    finally:
        byte_stream.close()
    return df
In [10]:
# Read from blob using pyarrow
rdf = get_pandas_dataframe_from_parquet_on_blob(
    blob_service, config['blob-store']['blob_container'], BLOB_NAME
)
print(rdf.head())
       name  color
0     Mario    Red
1     Luigi  Green
2  Princess   Pink

Reading the data using Spark for a single file Parquet blob is done using the following function. Note the path that uses the wasbs protocol.

In [11]:
def get_pyspark_dataframe_from_parquet_on_blob(config, sql_context, container_name, blob_name):
    """ Get a dataframe from Parquet file on blob storage using PySpark """
    path = f"wasbs://{container_name}@{config['blob-store']['blob_account_name']}.blob.core.windows.net/{blob_name}"
    return sql_context.read.parquet(path)
In [12]:
# Read from blob using PySpark
sdf = get_pyspark_dataframe_from_parquet_on_blob(
    config, sql_context, config['blob-store']['blob_container'], BLOB_NAME
)
print(sdf.show())
+--------+-----+-----------------+
|    name|color|__index_level_0__|
+--------+-----+-----------------+
|   Mario|  Red|                0|
|   Luigi|Green|                1|
|Princess| Pink|                2|
+--------+-----+-----------------+

None