At Predikto, we’re big fans of in-memory distributed processing for large datasets. Much of our processing occurs inside of Spark (speed + scale), and now with the recently released Datasource API with JDBC connectivity, integrating with any datasource got a lot easier. The Spark documentation covers the basics of the API and Dataframes. There is a lack of information on actually getting this feature to work on the internet, however.
TL;DR; Scroll to the bottom for the complete Gist.
In this example, I’ll cover PostgreSQL connectivity. Really, any JDBC-driver-supported datasource will work.
First, Spark needs to have the JDBC driver added to its classpath:
os.environ['SPARK_CLASSPATH'] = "/path/to/driver/postgresql-9.3-1103.jdbc41.jar"
Once loaded, create your SparkContext as usual:
from pyspark import SparkContext from pyspark.sql import SQLContext, Row sc = SparkContext("local[*]", '') sqlctx = SQLContext(sc)
Now, we’re ready to load data using the DataSource API. If we don’t specify a criteria, the entire table is loaded in to memory:
df = sqlctx.load( source="jdbc", url="jdbc:postgresql:///?user=&password=", dbtable=".")
- source: “jdbc” specifies that we will be using the JDBC DataSource API.
- dbtable: The JDBC table we will read from, and possible a subquery (more about this, below)
- url: The DB to connect to.
Using the above code, the ‘load’ call will execute a ‘SELECT * FROM ‘ immediately.
In some cases, we didn’t want an entire DB table loaded in to memory, so it took a bit of digging to understand how the new API handles “where” clauses. They really act more like subqueries, where anything in the ‘FROM’ clauses will work.
query = "(SELECT email_address as email from schema.users WHERE user_id<=1000) as )" df = sqlctx.load( source="jdbc", url="jdbc:postgresql:///?user=&password=", dbtable=query)
- query: This query contains our ‘WHERE’ clause. Note that you must specify an alias for this query.
Given the example above, Spark will consume a list of email addresses from our user table, for all users with an id <= 1000. Once we have a Dataframe in-hand, we can process the data using the API… converting it to an RDD or running SparkSQL calls over the data.
Complete example as Gist: