In continuation to my previous post on Modern Data Warehouse Architecture, in this post I'll give an example using PySpark API from Apache Spark for writing ETL jobs to offload the data warehouse.
Spark is lightening-fast in data processing and works well with hadoop ecosystem, you can read more about Spark at Apache Spark home. For now, let's talk about the ETL job. In my example, I'll merge a parent and a sub-dimension (type 2) table form MySQL database and will load them to a single dimension table in Hive with dynamic partitions. When building a warehouse on hive, it is advisable to avoid snow-flaking to reduce unnecessary joins as each join task creates a map task. Just to raise the curiosity, the throughput on a stand alone Spark deployment for this example job is 1M+ rows/min.
Here, I have an Employee table (300,024 rows) and a Salaries table(2,844,047 rows) as two sources, where employee's salary history record is maintained in a type 2 fashion with 'from_date' and 'to_date' columns. I've included the schema details in the comments as a best practice for coding ETL jobs. The target table is a Hive table with partitions based on year('to_date') from Salaries table and Load date as current date. Structuring the table with this dynamic partition helps in organizing the data well and also improves the queries for current employees, given that the 'to_date' column has end date as '9999-01-01' for all current records.
The logic is a simple join between two tables and addition of load_date and year columns followed by dynamic partition insert into a hive table. Here's how the DAG looks like:
After version 1.4 Spark UI visualizes the physical execution of a job as Direct Acyclic Graph (figure above), just similar to an ETL workflow. For this example, I've build Spark 1.5 with Hive and Hadoop 2.6.0
Below is the sample code for our job and for ease of use, I've given the runtime parameters within the job, ideally they are parameterized.
Code: MySQL to Hive ETL Job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'udaysharma' | |
# File Name: mysql_to_hive_etl.py | |
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SQLContext, HiveContext | |
from pyspark.sql import functions as sqlfunc | |
# Define database connection parameters | |
MYSQL_DRIVER_PATH = "/usr/local/spark/python/lib/mysql-connector-java-5.1.36-bin.jar" | |
MYSQL_USERNAME = '<USER_NAME >' | |
MYSQL_PASSWORD = '********' | |
MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME+"&password="+MYSQL_PASSWORD | |
# Define Spark configuration | |
conf = SparkConf() | |
conf.setMaster("spark://Box.local:7077") | |
conf.setAppName("MySQL_import") | |
conf.set("spark.executor.memory", "1g") | |
# Initialize a SparkContext and SQLContext | |
sc = SparkContext(conf=conf) | |
sql_ctx = SQLContext(sc) | |
# Initialize hive context | |
hive_ctx = HiveContext(sc) | |
# Source 1 Type: MYSQL | |
# Schema Name : EMPLOYEE | |
# Table Name : EMPLOYEES | |
# + --------------------------------------- + | |
# | COLUMN NAME| DATA TYPE | CONSTRAINTS | | |
# + --------------------------------------- + | |
# | EMP_NO | INT | PRIMARY KEY | | |
# | BIRTH_DATE | DATE | | | |
# | FIRST_NAME | VARCHAR(14) | | | |
# | LAST_NAME | VARCHAR(16) | | | |
# | GENDER | ENUM('M'/'F')| | | |
# | HIRE_DATE | DATE | | | |
# + --------------------------------------- + | |
df_employees = sql_ctx.load( | |
source="jdbc", | |
path=MYSQL_DRIVER_PATH, | |
driver='com.mysql.jdbc.Driver', | |
url=MYSQL_CONNECTION_URL, | |
dbtable="employees") | |
# Source 2 Type : MYSQL | |
# Schema Name : EMPLOYEE | |
# Table Name : SALARIES | |
# + -------------------------------- + | |
# | COLUMN NAME | TYPE | CONSTRAINTS | | |
# + -------------------------------- + | |
# | EMP_NO | INT | PRIMARY KEY | | |
# | SALARY | INT | | | |
# | FROM_DATE | DATE | PRIMARY KEY | | |
# | TO_DATE | DATE | | | |
# + -------------------------------- + | |
df_salaries = sql_ctx.load( | |
source="jdbc", | |
path=MYSQL_DRIVER_PATH, | |
driver='com.mysql.jdbc.Driver', | |
url=MYSQL_CONNECTION_URL, | |
dbtable="salaries") | |
# Perform INNER JOIN on the two data frames on EMP_NO column | |
# As of Spark 1.4 you don't have to worry about duplicate column on join result | |
df_emp_sal_join = df_employees.join(df_salaries, "emp_no").select("emp_no", "birth_date", "first_name", | |
"last_name", "gender", "hire_date", | |
"salary", "from_date", "to_date") | |
# Adding a column 'year' to the data frame for partitioning the hive table | |
df_add_year = df_emp_sal_join.withColumn('year', F.year(df_emp_sal_join.to_date)) | |
# Adding a load date column to the data frame | |
df_final = df_add_year.withColumn('Load_date', F.current_date()) | |
df_final.repartition(10) | |
# Registering data frame as a temp table for SparkSQL | |
hive_ctx.registerDataFrameAsTable(df_final, "EMP_TEMP") | |
# Target Type: APACHE HIVE | |
# Database : EMPLOYEES | |
# Table Name : EMPLOYEE_DIM | |
# + ------------------------------- + | |
# | COlUMN NAME| TYPE | PARTITION | | |
# + ------------------------------- + | |
# | EMP_NO | INT | | | |
# | BIRTH_DATE | DATE | | | |
# | FIRST_NAME | STRING | | | |
# | LAST_NAME | STRING | | | |
# | GENDER | STRING | | | |
# | HIRE_DATE | DATE | | | |
# | SALARY | INT | | | |
# | FROM_DATE | DATE | | | |
# | TO_DATE | DATE | | | |
# | YEAR | INT | PRIMARY | | |
# | LOAD_DATE | DATE | SUB | | |
# + ------------------------------- + | |
# Storage Format: ORC | |
# Inserting data into the Target table | |
hive_ctx.sql("INSERT OVERWRITE TABLE EMPLOYEES.EMPLOYEE_DIM PARTITION (year, Load_date) \ | |
SELECT EMP_NO, BIRTH_DATE, FIRST_NAME, LAST_NAME, GENDER, HIRE_DATE, \ | |
SALARY, FROM_DATE, TO_DATE, year, Load_date FROM EMP_TEMP") |
Since we already have the required configuration declared within our code, we can simply run this job by calling
spark-submit mysql_to_hive_etl.py
Once the job is run, our target table will have 2844047 rows as expected and the partitions will look like:
What's amazing is that, this whole process finishes within 2-3 mins, out of which majority of time is spent on insert, i.e. we have a throughput of 1M+ rows/min on Hive.
Boo-yeah!