Monday, September 28, 2015

ETL with Apache Spark

In continuation to my previous post on Modern Data Warehouse Architecture, in this post I'll give an example  using PySpark API from Apache Spark for writing ETL jobs to offload the data warehouse.

Spark is lightening-fast in data processing and works well with hadoop ecosystem, you can read more about Spark at Apache Spark home. For now, let's talk about the ETL job. In my example, I'll merge a parent and a sub-dimension (type 2) table form MySQL database and will load them to a single dimension table in Hive with dynamic partitions. When building a warehouse on hive, it is advisable to avoid snow-flaking to reduce unnecessary joins as each join task creates a map task. Just to raise the curiosity, the throughput on a stand alone Spark deployment for this example job is 1M+ rows/min.


Here, I have an Employee table (300,024 rows) and a Salaries table(2,844,047 rows) as two sources, where employee's salary history record is maintained in a type 2 fashion with 'from_date' and 'to_date' columns.  I've included the schema details in the comments as a best practice for coding ETL jobs. The target table is a Hive table with partitions based on year('to_date') from Salaries table and Load date as current date. Structuring the table with this dynamic partition helps in organizing the data well and also improves the queries for current employees, given that the 'to_date' column has end date as '9999-01-01' for all current records.

The logic is a simple join between two tables and addition of load_date and year columns followed by dynamic partition insert into a hive table. Here's how the DAG looks like:
Screen Shot 2015-09-28 at 1.44.32 PM

After version 1.4 Spark UI visualizes the physical execution of a job as Direct Acyclic Graph (figure above), just similar to an ETL workflow. For this example, I've build Spark 1.5 with Hive and Hadoop 2.6.0

Below is the sample code for our job and  for ease of use, I've given the runtime parameters within the job, ideally they are parameterized.

Code: MySQL to Hive ETL Job


__author__ = 'udaysharma'
# File Name: mysql_to_hive_etl.py
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql import functions as sqlfunc
# Define database connection parameters
MYSQL_DRIVER_PATH = "/usr/local/spark/python/lib/mysql-connector-java-5.1.36-bin.jar"
MYSQL_USERNAME = '<USER_NAME >'
MYSQL_PASSWORD = '********'
MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME+"&password="+MYSQL_PASSWORD
# Define Spark configuration
conf = SparkConf()
conf.setMaster("spark://Box.local:7077")
conf.setAppName("MySQL_import")
conf.set("spark.executor.memory", "1g")
# Initialize a SparkContext and SQLContext
sc = SparkContext(conf=conf)
sql_ctx = SQLContext(sc)
# Initialize hive context
hive_ctx = HiveContext(sc)
# Source 1 Type: MYSQL
# Schema Name : EMPLOYEE
# Table Name : EMPLOYEES
# + --------------------------------------- +
# | COLUMN NAME| DATA TYPE | CONSTRAINTS |
# + --------------------------------------- +
# | EMP_NO | INT | PRIMARY KEY |
# | BIRTH_DATE | DATE | |
# | FIRST_NAME | VARCHAR(14) | |
# | LAST_NAME | VARCHAR(16) | |
# | GENDER | ENUM('M'/'F')| |
# | HIRE_DATE | DATE | |
# + --------------------------------------- +
df_employees = sql_ctx.load(
source="jdbc",
path=MYSQL_DRIVER_PATH,
driver='com.mysql.jdbc.Driver',
url=MYSQL_CONNECTION_URL,
dbtable="employees")
# Source 2 Type : MYSQL
# Schema Name : EMPLOYEE
# Table Name : SALARIES
# + -------------------------------- +
# | COLUMN NAME | TYPE | CONSTRAINTS |
# + -------------------------------- +
# | EMP_NO | INT | PRIMARY KEY |
# | SALARY | INT | |
# | FROM_DATE | DATE | PRIMARY KEY |
# | TO_DATE | DATE | |
# + -------------------------------- +
df_salaries = sql_ctx.load(
source="jdbc",
path=MYSQL_DRIVER_PATH,
driver='com.mysql.jdbc.Driver',
url=MYSQL_CONNECTION_URL,
dbtable="salaries")
# Perform INNER JOIN on the two data frames on EMP_NO column
# As of Spark 1.4 you don't have to worry about duplicate column on join result
df_emp_sal_join = df_employees.join(df_salaries, "emp_no").select("emp_no", "birth_date", "first_name",
"last_name", "gender", "hire_date",
"salary", "from_date", "to_date")
# Adding a column 'year' to the data frame for partitioning the hive table
df_add_year = df_emp_sal_join.withColumn('year', F.year(df_emp_sal_join.to_date))
# Adding a load date column to the data frame
df_final = df_add_year.withColumn('Load_date', F.current_date())
df_final.repartition(10)
# Registering data frame as a temp table for SparkSQL
hive_ctx.registerDataFrameAsTable(df_final, "EMP_TEMP")
# Target Type: APACHE HIVE
# Database : EMPLOYEES
# Table Name : EMPLOYEE_DIM
# + ------------------------------- +
# | COlUMN NAME| TYPE | PARTITION |
# + ------------------------------- +
# | EMP_NO | INT | |
# | BIRTH_DATE | DATE | |
# | FIRST_NAME | STRING | |
# | LAST_NAME | STRING | |
# | GENDER | STRING | |
# | HIRE_DATE | DATE | |
# | SALARY | INT | |
# | FROM_DATE | DATE | |
# | TO_DATE | DATE | |
# | YEAR | INT | PRIMARY |
# | LOAD_DATE | DATE | SUB |
# + ------------------------------- +
# Storage Format: ORC
# Inserting data into the Target table
hive_ctx.sql("INSERT OVERWRITE TABLE EMPLOYEES.EMPLOYEE_DIM PARTITION (year, Load_date) \
SELECT EMP_NO, BIRTH_DATE, FIRST_NAME, LAST_NAME, GENDER, HIRE_DATE, \
SALARY, FROM_DATE, TO_DATE, year, Load_date FROM EMP_TEMP")
Since we already have the required configuration declared within our code, we can simply run this job by calling
spark-submit mysql_to_hive_etl.py

Once the job is run, our target table will have 2844047 rows as expected and the partitions will look like:

Screen Shot 2015-09-29 at 12.42.37 AM

Screen Shot 2015-09-29 at 12.43.01 AM

Screen Shot 2015-09-29 at 12.43.13 AM

Screen Shot 2015-09-29 at 12.46.55 AM

What's amazing is that, this whole process finishes within 2-3 mins, out of which majority of time is spent on insert, i.e. we have a  throughput of 1M+ rows/min on Hive.

Boo-yeah!