Using the Spark Datasource API to access a Database

spark-logo

At Predikto, we’re big fans of in-memory distributed processing for large datasets. Much of our processing occurs inside of Spark (speed + scale), and now with the recently released Datasource API with JDBC connectivity, integrating with any datasource got a lot easier. The Spark documentation covers the basics of the API and Dataframes. There is a lack of information on actually getting this feature to work on the internet, however.

TL;DR; Scroll to the bottom for the complete Gist.

In this example, I’ll cover PostgreSQL connectivity. Really, any JDBC-driver-supported datasource will work.

First, Spark needs to have the JDBC driver added to its classpath:

os.environ['SPARK_CLASSPATH'] = "/path/to/driver/postgresql-9.3-1103.jdbc41.jar"

Once loaded, create your SparkContext as usual:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
 
sc = SparkContext("local[*]", '')
sqlctx = SQLContext(sc)

Now, we’re ready to load data using the DataSource API. If we don’t specify a criteria, the entire table is loaded in to memory:

df = sqlctx.load(
  source="jdbc", 
  url="jdbc:postgresql:///?user=&password=",
  dbtable=".")
  1. source: “jdbc” specifies that we will be using the JDBC DataSource API.
  2. dbtable: The JDBC table we will read from, and possible a subquery (more about this, below)
  3. url: The DB to connect to.

Using the above code, the ‘load’ call will execute a ‘SELECT * FROM ‘ immediately.

In some cases, we didn’t want an entire DB table loaded in to memory, so it took a bit of digging to understand how the new API handles “where” clauses. They really act more like subqueries, where anything in the ‘FROM’ clauses will work.

query = "(SELECT email_address as email from schema.users WHERE user_id<=1000) as )"

df = sqlctx.load(
  source="jdbc", 
  url="jdbc:postgresql:///?user=&password=",
  dbtable=query)
  1.  query: This query contains our ‘WHERE’ clause. Note that you must specify an alias for this query.

Given the example above, Spark will consume a list of email addresses from our user table, for all users with an id <= 1000. Once we have a Dataframe in-hand, we can process the data using the API… converting it to an RDD or running SparkSQL calls over the data.

Complete example as Gist: