Using SFTP with Spark


  1. Setup a simple SFTP server on your (Ubuntu) machine.
  2. Add a file with the following content:
$ cat /home/ftpuser/data/sample.psv
mr|john doe|34
mrs|jane doe|30
  1. ZIP the PSV file.
$ zip /home/ftpuser/data/ /home/ftpuser/data/sample.psv
  1. Have Spark installed
  2. Have the JAR from available in your Spark JAR directory.

Setup context

In [2]:
# !wget
In [3]:
# !wget
In [4]:
from pyspark import SparkContext, SparkConf, SQLContext
import os

os.environ['HADOOP_HOME'] = '/opt/hadoop/'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['LD_LIBRARY_PATH'] = '/opt/hadoop/lib/native'
os.environ['SPARK_DIST_CLASSPATH'] = "/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*"
os.environ['SPARK_HOME'] = '/opt/spark/'

conf = (
    .setAppName("Spark SFTP Test")
    .set("spark.hadoop.fs.sftp.impl", "org.apache.hadoop.fs.sftp.SFTPFileSystem")
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)

Read a file directly into a dataframe

In [5]:
df = sqlContext\
    .option("host", os.environ.get('FTP_HOST'))\
    .option("username", os.environ.get('FTP_USER'))\
    .option("password", os.environ.get('FTP_PASS'))\
    .option("fileType", "csv")\
    .option("delimiter", "|")\
    .option("quote", "\"")\
    .option("escape", "\\")\
    .option("multiLine", "true")\
    .option("inferSchema", "false")\
    .option("header", "false")\
| _c0|_c1|_c2|
|john|doe| 35|
|jane|doe| 32|

Read a file as binary file and transform into a dataframe

In [6]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType())
file_path = 'data/sample.psv'

psv_df = sc\
    .map(lambda row: row[1].decode('utf-8').strip())\
    .flatMap(lambda row: str(row).split('\n'))\
    .map(lambda row: str(row).split('|'))\
|      john|      doe| 35|
|      jane|      doe| 32|

Extract a ZIP archive and parse the content into a dataframe

The following snippet extracts a ZIP file in memory and returns the content of the first file.

In [7]:
import io
import zipfile

def zip_extract(row):
    file_path, content = row
    zfile = zipfile.ZipFile(io.BytesIO(content), "r")
    files = [i for i in zfile.namelist()]
    return[0]).read().decode("utf-8", errors='ignore')

file_path = 'data/'

psv_df = sc\
    .map(lambda row: row.strip())\
    .flatMap(lambda row: str(row).split('\n'))\
    .map(lambda row: str(row).split('|'))\
|      john|      doe| 35|
|      jane|      doe| 32|