Monday, September 28, 2015

ETL with Apache Spark

In continuation to my previous post on Modern Data Warehouse Architecture, in this post I'll give an example  using PySpark API from Apache Spark for writing ETL jobs to offload the data warehouse.

Spark is lightening-fast in data processing and works well with hadoop ecosystem, you can read more about Spark at Apache Spark home. For now, let's talk about the ETL job. In my example, I'll merge a parent and a sub-dimension (type 2) table form MySQL database and will load them to a single dimension table in Hive with dynamic partitions. When building a warehouse on hive, it is advisable to avoid snow-flaking to reduce unnecessary joins as each join task creates a map task. Just to raise the curiosity, the throughput on a stand alone Spark deployment for this example job is 1M+ rows/min.


Here, I have an Employee table (300,024 rows) and a Salaries table(2,844,047 rows) as two sources, where employee's salary history record is maintained in a type 2 fashion with 'from_date' and 'to_date' columns.  I've included the schema details in the comments as a best practice for coding ETL jobs. The target table is a Hive table with partitions based on year('to_date') from Salaries table and Load date as current date. Structuring the table with this dynamic partition helps in organizing the data well and also improves the queries for current employees, given that the 'to_date' column has end date as '9999-01-01' for all current records.

The logic is a simple join between two tables and addition of load_date and year columns followed by dynamic partition insert into a hive table. Here's how the DAG looks like:
Screen Shot 2015-09-28 at 1.44.32 PM

After version 1.4 Spark UI visualizes the physical execution of a job as Direct Acyclic Graph (figure above), just similar to an ETL workflow. For this example, I've build Spark 1.5 with Hive and Hadoop 2.6.0

Below is the sample code for our job and  for ease of use, I've given the runtime parameters within the job, ideally they are parameterized.

Code: MySQL to Hive ETL Job


Since we already have the required configuration declared within our code, we can simply run this job by calling
spark-submit mysql_to_hive_etl.py

Once the job is run, our target table will have 2844047 rows as expected and the partitions will look like:

Screen Shot 2015-09-29 at 12.42.37 AM

Screen Shot 2015-09-29 at 12.43.01 AM

Screen Shot 2015-09-29 at 12.43.13 AM

Screen Shot 2015-09-29 at 12.46.55 AM

What's amazing is that, this whole process finishes within 2-3 mins, out of which majority of time is spent on insert, i.e. we have a  throughput of 1M+ rows/min on Hive.

Boo-yeah!

2 comments :

  1. A great write-up !
    However, all you mentioned here is just some simply data merge and data migration , I am really interested in how to do some data cleaning systematically.
    Thanks!

    ReplyDelete
  2. Thanks for reading! Yes, this was just for testing the waters. I am working on a few examples for loading json to hive/ cassandra after doing some data processing and filtering. Shall publish a post about it.

    ReplyDelete