Setting up Spark with minIO as object storage

Objective

Introduction

At work we use AWS S3 for our datalake. Since I am working on some data projects, I would like to have a similar experience, but without AWS and simply on my own server. This is the reason why I chose minIO as object storage, it's free, runs on Ubuntu and is compatible with the AWS S3 API.

Installation

The Ansible configuration from my previous blog post already installed an older version of Spark. During my several attempts to get minIO working with Spark, I had to try different Hadoop versions, Spark and AWS libraries to make the installation work. I used the latest version from the Spark download page, which at the time of writing is 2.4.3. Since I have to use the latest Hadoop version (3.1.2), I have to get the Spark download without Hadoop. The current Spark only support Hadoop version 2.7 or lower. For all the AWS libraries that are needed, I could only get the integration to work with version 1.11.534.

The following Java libraries are needed to get minIO working with Spark:

To run the minIO server, I first create a minIO user and minIO group. Additionally I create the data folder that minIO will store the data. After preparing the environment I install minIO and add it as a service /etc/systemd/system/minIO.service.

[Unit]
Description=minIO
Documentation=https://docs.minIO.io
Wants=network-online.target
After=network-online.target
AssertFileIsExecutable=/usr/local/bin/minIO

[Service]
WorkingDirectory=/usr/local/

User=minIO
Group=minIO

PermissionsStartOnly=true

EnvironmentFile=/etc/default/minIO
ExecStartPre=/bin/bash -c "[ -n \"${minIO_VOLUMES}\" ] || echo \"Variable minIO_VOLUMES not set in /etc/default/minIO\""

ExecStart=/usr/local/bin/minIO server $minIO_OPTS $minIO_VOLUMES

# Let systemd restart this service only if it has ended with the clean exit code or signal.
Restart=on-success

StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65536

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# SIGTERM signal is used to stop minIO
KillSignal=SIGTERM

SendSIGKILL=no

SuccessExitStatus=0


[Install]
WantedBy=multi-user.target

The minIO environment file located at /etc/default/minIO contains the configuration for the volume, the port and the credentials.

# minIO local/remote volumes.
minIO_VOLUMES="/minIO-data/"
# minIO cli options.
minIO_OPTS="--address :9091 "

minIO_ACCESS_KEY="mykey"
minIO_SECRET_KEY="mysecret"
$ minIO version
Version: 2019-06-27T21:13:50Z
Release-Tag: RELEASE.2019-06-27T21-13-50Z
Commit-ID: 36c19f1d653adf3ef70128eb3be1a35b6b032731

For the complete configuration, check the role in Github.

Code

The important bit is setting the right environment variables. Make sure the following variables are set:

export HADOOP_HOME=/opt/hadoop
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin
export PATH=$PATH:$HADOOP_HOME/bin
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
export SPARK_DIST_CLASSPATH=$(hadoop classpath)

After setting the environment variables, we need to make sure we connect to the minIO endpoint and set the credentials. Make sure the path.style.access is set to True.

from pyspark import SparkContext, SparkConf, SQLContext
conf = (
    SparkConf()
    .setAppName("Spark minIO Test")
    .set("spark.hadoop.fs.s3a.endpoint", "http://localhost:9091")
    .set("spark.hadoop.fs.s3a.access.key", os.environ.get('minIO_ACCESS_KEY'))
    .set("spark.hadoop.fs.s3a.secret.key", os.environ.get('minIO_SECRET_KEY'))
    .set("spark.hadoop.fs.s3a.path.style.access", True)
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)

Once this is done, we can simply access the bucket and read a text file (given that this bucket and text file exists), and we are able to write a dataframe to minIO.

print(sc.wholeTextFiles('s3a://datalake/test.txt').collect())
# Returns: [('s3a://datalake/test.txt', 'Some text\nfor testing\n')]
path = "s3a://user-jitsejan/mario-colors-two/"
rdd = sc.parallelize([('Mario', 'Red'), ('Luigi', 'Green'), ('Princess', 'Pink')])
rdd.toDF(['name', 'color']).write.csv(path)

Todo

Currently there seems to be an issue with reading small files, it will give a Parquet error that the files are not big enough to read. It seems more like a library issue, so I should just make sure I only work on big data.

Credits

Thanks to atosatto for the Ansible role and minIO for the great example.