Integrating PySpark with Salesforce

To get a connection in Spark with Salesforce the advice is to use the spark-salesforce library. In order to make this work several dependencies need to be added. Make sure the core libraries to support XML are also downloaded.

$ wget https://repo1.maven.org/maven2/com/springml/spark-salesforce_2.11/1.1.1/spark-salesforce_2.11-1.1.1.jar 
$ wget https://repo1.maven.org/maven2/com/springml/salesforce-wave-api/1.0.9/salesforce-wave-api-1.0.9.jar
$ wget https://repo1.maven.org/maven2/com/force/api/force-partner-api/40.0.0/force-partner-api-40.0.0.jar
$ wget https://repo1.maven.org/maven2/com/force/api/force-wsc/40.0.0/force-wsc-40.0.0.jar
$ wget https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-xml/2.10.3/jackson-dataformat-xml-2.10.3.jar
$ wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.10.3/jackson-core-2.10.3.jar

The configuration is saved in config.ini with the following fields:

[salesforce]
username = [email protected]
password = securePassw0rd
token = sal3sforceT0ken

Loading the configuration is done using configparser:

from configparser import ConfigParser

config = ConfigParser()
config.read('config.ini')

When creating the SparkSession make sure the paths to the differents JARs are correctly set:

from pyspark import SparkSession

jars = [
    'spark-salesforce_2.11-1.1.1.jar',
    'salesforce-wave-api-1.0.9.jar',
    'force-partner-api-40.0.0.jar',
    'force-wsc-40.0.0.jar',
    'jackson-dataformat-xml-2.10.3.jar',
    'jackson-core-2.10.3.jar',
]
spark = (SparkSession
  .builder
  .appName("PySpark with Salesforce")
  .config("spark.driver.extraClassPath", ":".join(jars))
  .getOrCreate())  

The session is created and we are ready to pull some data:

soql = "SELECT name, industry, type, billingaddress, sic FROM account"  
df = spark \
     .read \
     .format("com.springml.spark.salesforce") \
     .option("username", config.get('salesforce', 'username')) \
     .option("password", f"{config.get('salesforce', 'password')}{config.get('salesforce', 'token')}") \
     .option("soql", soql) \
     .load()