Using SFTP with Spark

Pre-req

  1. Setup a simple SFTP server on your (Ubuntu) machine.
  2. Add a file with the following content:
$ cat /home/ftpuser/data/sample.psv
title|name|age
mr|john doe|34
mrs|jane doe|30
  1. ZIP the PSV file.
$ zip /home/ftpuser/data/testpsv.zip /home/ftpuser/data/sample.psv
  1. Have Spark installed
  2. Have the JAR from https://github.com/springml/spark-sftp available in your Spark JAR directory.

Setup context

In [2]:
# !wget https://repo1.maven.org/maven2/com/springml/sftp.client/1.0.3/sftp.client-1.0.3.jar
In [3]:
# !wget https://repo1.maven.org/maven2/com/springml/spark-sftp_2.10/1.0.2/spark-sftp_2.10-1.0.2.jar
In [4]:
from pyspark import SparkContext, SparkConf, SQLContext
import os

os.environ['HADOOP_HOME'] = '/opt/hadoop/'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['LD_LIBRARY_PATH'] = '/opt/hadoop/lib/native'
os.environ['SPARK_DIST_CLASSPATH'] = "/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*"
os.environ['SPARK_HOME'] = '/opt/spark/'

conf = (
    SparkConf()
    .setAppName("Spark SFTP Test")
    .set("spark.hadoop.fs.sftp.impl", "org.apache.hadoop.fs.sftp.SFTPFileSystem")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)

Read a file directly into a dataframe

In [5]:
df = sqlContext\
    .read\
    .format("com.springml.spark.sftp")\
    .option("host", os.environ.get('FTP_HOST'))\
    .option("username", os.environ.get('FTP_USER'))\
    .option("password", os.environ.get('FTP_PASS'))\
    .option("fileType", "csv")\
    .option("delimiter", "|")\
    .option("quote", "\"")\
    .option("escape", "\\")\
    .option("multiLine", "true")\
    .option("inferSchema", "false")\
    .option("header", "false")\
    .load("/data/sample.psv")\
    .show()
+----+---+---+
| _c0|_c1|_c2|
+----+---+---+
|john|doe| 35|
|jane|doe| 32|
+----+---+---+

Read a file as binary file and transform into a dataframe

In [6]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType())
])
file_path = 'data/sample.psv'

psv_df = sc\
    .binaryFiles(f"sftp://{os.environ.get('FTP_USER')}:{os.environ.get('FTP_PASS')}@{os.environ.get('FTP_HOST')}/{file_path}")\
    .map(lambda row: row[1].decode('utf-8').strip())\
    .flatMap(lambda row: str(row).split('\n'))\
    .map(lambda row: str(row).split('|'))\
    .toDF(schema=schema)\
    .show()
+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|      john|      doe| 35|
|      jane|      doe| 32|
+----------+---------+---+

Extract a ZIP archive and parse the content into a dataframe

The following snippet extracts a ZIP file in memory and returns the content of the first file.

In [7]:
import io
import zipfile

def zip_extract(row):
    file_path, content = row
    zfile = zipfile.ZipFile(io.BytesIO(content), "r")
    files = [i for i in zfile.namelist()]
    return zfile.open(files[0]).read().decode("utf-8", errors='ignore')

file_path = 'data/testpsv.zip'

psv_df = sc\
    .binaryFiles(f"sftp://{os.environ.get('FTP_USER')}:{os.environ.get('FTP_PASS')}@{os.environ.get('FTP_HOST')}/{file_path}")\
    .map(zip_extract)\
    .map(lambda row: row.strip())\
    .flatMap(lambda row: str(row).split('\n'))\
    .map(lambda row: str(row).split('|'))\
    .toDF(schema=schema)\
    .show()
+----------+---------+---+
|first_name|last_name|age|
+----------+---------+---+
|      john|      doe| 35|
|      jane|      doe| 32|
+----------+---------+---+