Category Archives: Data Systems & Structures

MapReduce, MRJob & AWS EMR Pointers

Over the last couple weeks, I’ve been playing around with MapReduce, MRJob and AWS to answer some questions about event data. Granted this is definitely more data engineering focused than data science, but using these tools can be very beneficial if you are analyzing a ton of data (esp. event log data).

This is more of an overview with a few lessons learned on how to setup a MapReduce job using MRJob and AWS EMR. This post focuses more on process and less about the script logic.

First, What is MapReduce (MR)?
MR is an algorithm used especially for large amounts of data to easily apply some type of filter and organization of the data and then condense it into a result. It was born from similar concepts used in functional programming.

Map = procedure to filter and sort
Reduce = procedure to condense and summarize

Word count is typically used as the “Hello World” of MapReduce. So think about taking a book like Hitchhiker’s Guide to count the occurrences of all the words. The map step would create key/value pairs (i.e. dictionary or hash format) for every single word in the book. So a word is the key and a value like the number 1 would be applied (e.g. “hitchhiker”: 1, “galaxy”: 1, “guide”: 1, “hitchhiker”: 1). There would be duplicate keys outputted.

The reduce step condenses all duplicate keys like “hitchhiker” to a single, unique key for each word where all the related values are put into a list (e.g. “hitchhiker”: [1,1,1,1,1,1,….1]. The list will contain a 1 for every occurrence of “hitchhiker in the book. Then reduce can perform a summarization task by literally adding up the numbers or taking the length of the list. The reduce step also outputs a key/value pair for each unique word in the book with the summed value (e.g. “hitchhiker”: 42).

This is a very simplistic example of MR and there are many complex variations based on the problem being solved. For example, MapReduce is cited as a solution to use on something like Twitter follower recommendations. There are a number of online resources that provide more examples and just looking at other examples can help with defining the MR logic. A couple resources I found in my research covered complex patterns and an overview of different types of patterns.

Other Tools/Techniques to Understand:
In order to follow along, these are brief overviews of key approaches and tools. I recommend reading further on each of them.

  • Hadoop = An Apache framework that helps distribute, store and process data across many machines (cluster)
  • HDFS = Hadoop Distributed File System is the storage solution that is part of the Hadoop framework. It is specifically geared for distributed systems.
  • S3 = Simple Storage Service is just an AWS storage system. You cannot open files or run programs in S3
  • EC2 = Elastic Compute Clouds are virtual Amazon computers for rent. You can create, run and terminate servers as needed which led to the term elastic. Its having an additional computer or computers you can configure how you need and run applications on but you don’t have to maintain the hardware or operating system
  • EMR = Elastic MapReduce is an Amazon web service that pre configures EC2 instances with Hadoop. Basically its a service to easily spin up a cluster of Hadoop formatted machines without having to acquire and setup the hardware and software yourself
  • MRJob = Yelp developed package/library so you can write python MapReduce scripts that will run on Hadoop frameworks
  • Screen = GNU software to multiplex terminal sessions / consoles. This means you can run a program encapsulated in its own environment. Or another way to say it is if you kicked off an EMR job from your computer at work inside a screen session, you can detach the session, close your computer down and go home and then log back into the screen session to see that the job has been uninterrupted and is still processing

Warning:
Hadoop should typically not be your immediate choice when analyzing data. I’ve heard this multiple times from different experts in the field. You really need to think about the type of problems you are solving/questions you are answering, the type, format and size of data and time and money to apply to the problem.

Many times in data science you can take adequate samples of data to answer questions and solve problems without needing Hadoop for processing. So when you take on a challenge and people are immediately saying Hadoop, take time to talk through if that is appropriate. Make your life easier by going for simpler solutions first before bringing out the big guns.

Overview of My Experience / Lessons Learned:

Abstract Challenge:
For the problem I worked on, I was trying to answer a number of questions around activity using event logs. Event data is prolific and can be a good case for using MR.

Data:
The event logs were JSON formatted, 1 event per line in several files that were gzipped and stored on S3. The good thing about MRJob is that it has built in protocols to handle unzipping and processing files. There was only one adjustment I made to my script to make it easier to handle the fact that each line was already in a JSON format which was to add the lines below:

  • from mrjob.protocol import JSONValueProtocol
  • INPUT_PROTOCOL = JSONValueProtocol

Approach:

Local & Small

  • Pulled 1 JSON event to analyze what data was available and focused on the data values needed to answer the question
  • Simulated/created a few variations on the JSON event example to cover key use cases for testing the the MRJob script
  • Developed 1 MR step  (mapper/reducer) to just pull simple data point counts on the dummy JSON values
  • Expanded MR to address the more complex question. This led to a multi-step MR job (2 rounds of map and reduce) which eventually condensed to  1 map and 2 reduce steps
    • If the questions would call for a SQL join or groupby to get the answer then used those data points as keys
    • Used a conditional in the map step to filter and streamlined the yielded results that needed to be sorted and condensed
  • During initial code development,rananddebuggedMRJob code locally on dummy JSON event data which was also stored locally. Used the followingbashcommandtorunMRJob:
    • python [MRJob script] [data file] # this output results to stdout (e.g. terminal)
    • Note you can just run specific steps to focus on debugging like just a map step by appending something like –mapper to the command above

Local & AWS Remote

  • Once the results from the dummy values looked good, spun up an EC2 instance
    • FYI, if you don’t have an account with AWS, sign up and get your access keys (id & secret)
  • Pulled1zipfilefromS3onto the instance because it was too big for the personal computer
    • Used s3cmd (command line tool) to get access to data
    • s3cmd get [filename] # downloads file on EC2 instance
  • Unzipped file and pulled about 100 events into a sample file. Then exported the sample file back to S3
    • gunzip [filename]
    • head -n 100 [filename] > sample.txt
    • s3cmd put [filename] # store file back on s3
  • Ran exploratory data analysis using Pandas on the sample to verify data structure and results
  • Referenced the sample data pulled down from S3 ran locally to debug initially
    • python [MRJob script] [data file]
  • Once the script worked and the numbers made sense, then ran the file through EMR
    • setup MRJob config file and see example further below
    • setup pem file and stored on computer running MRJob script
    • used the following command:
    • python [MRJob script] -o “[s3 bucket for output]” -r emr “[s3 bucket for input]”
    • S3 file path typically starts with s3:// and make sure quotes are around the path
  • There were issues to debug on the sample but once fixed, I setup a screen and ran the code on the full data set

Configuration Tips:
To make it easier to run MRJob, use a configuration file. MRJob provides documentation on how to set this up. One way is to put the file in your root directory at the ~/ and label it .mrjob.conf to make MRJob automatically find it. There are a number of things that can be pre configured and will save how long the command line script is when running an EMR job.

  • runners:
    • emr:
      • cmdenv:
      • TZ: America/
        • aws_access_key_id: [your key]
      • aws_secret_access_key: [your key]
      • ssh_tunnel_to_job_tracker: true
      • aws_region: us-west-2
      • ec2_instance_type: m1.xlarge
      • ec2_key_pair: [pem file name]
      • ec2_key_pair_file: ~/.ssh/[pem file name].pem
      • num_ec2_core_instances: 0
      • enable_emr_debugging: true
      • ami_version: latest

Note, above is an example and there are many variations and additional parameters you can add and change based on the job you are running. Also, some parameters have to be entered in at the command line and cannot be added to the configuration file.

When you first run a small sample job on EMR, do it with 1 instance and on something with less horsepower. In this case, you would setup the EC2 instance in the m1 range and only 0 instances. These commands relate to the instance type and number:

  • ec2_instance_type: m1.small
  • num_ec2_core_instances: 0

If your script calls for a package that is not standard on EMR, you will need to bootstrap/load the EMR instances with the package prior to running the job. In my case, I needed dateutil and in order to load it, I first needed to load pip. So I added the following commands to my config file:

  • bootstrap:
  • – sudo apt-get install -y python-pip || sudo yum install -y python-pip
  • – sudo pip install python-dateutil

Also when setting up AWS, you will need to create an EC2 pem file for encryption or better known as key pairs. This is different from the access keys. AWS provides a step by step process to setup the pem file. Be sure to remember the name you give it and to move the file that is downloaded into a folder you reference in your configuration. Most people  typically put it in the .ssh file on the root directory. Also for MAC and Linux, be sure to change the file permission by running the command in the terminal on the pem file: chmod 400 . You can confirm the permissions changed if you are in the same folder as the pem file by running: ls -la. The following commands reference the pem file in .mrjob.conf file:

  • ec2_key_pair: [pem file name]
  • ec2_key_pair_file: ~/.ssh/[pem file name].pem

A couple additional setup tips:

  • When running on more than the sample data, be sure to add to the command line or configuration file: –no-output. This makes sure that the job does not output all stdout values on your local computer when the full data set is being processed. You really don’t want that
  • Set S3 and EMR to the same region so Amazon will not charge for bandwidth used between them
  • Stop instances when you are not using them to save money

If you want to see what’s going on with the EMR instances, you can login while they are running and poke around. Login to the AWS console and go to EMR. Click on the cluster that is running your job. There will be a Master public DNS you will want to use. Just use the following command in your terminal:

  • ssh hadoop@[EMR Master public DNS] OR
  • ssh -i hadoop@[EMR Master public DNS]

This will connect your terminal directly into the EMR instances. You can poke around and see what’s going on while they are running. Unless you specify to keep EMR running after the job is done, then the instances will terminate at the end of the job and boot you out.

Troubleshooting Pointers:
Data Inconsistency – Be careful to analyze the data and confirm what you have access to. This is a common challenge. In my case, there were missing values out of different events I worked with which required changing the code a few times to do a check for values as well as get information out of different data points that were more consistent. Bottom line, don’t trust the data.

DateTime & UTC – I’ve wrestled with this devil many times in the past and it still tripped me up on this project. Make sure if your conditionals are working with time, and they typically will be with event logs, to deliberately translate and compare datetime in UTC format.

Traceback Error – If your EMR terminates with errors then you can go into the AWS dashboard and the EMR section. Choose the cluster that you ran, then expand the Steps area. Click on View jobs under the job that failed. Then click on View tasks under the job that failed. Select View attempts next to a task that failed and choose stderr link on a task that failed. That will open an error log that can help provide more context around what went wrong.

Final Thoughts:
There are so many variations on the process outlined above not to mention many different tools you can use. This post was to give some pointers on my approach at a pseudo high-level for those trying to figure out the end to end process. You really have to research and figure out what works best for your situation.

Where I would go next with this area is to play around with something like Spark to handle streaming data and to explore implementing machine learning algorithms on massive data. Although neural nets are more of a personal interest for me right now.

How Flask, Heroku & Alembic Play Together

I just spent a couple days getting up to speed on database migrations in general, how to make it work with Flask and Postgres and how to make them work on Heroku. There is some information out there but it took a little time hunting down what I needed; thus, I’ve summarized some of the main steps to help get others up and running with Flask, Heroku and Alembic.

Why migrate? Best practice is to avoid recreating your database(s). Usually you just want to make changes to the existing database(s) and track those changes. If at any time you need to go back to a previous version, the migration docs will help you easily revert to an old version / schema and then upgrade back to the most recent depending on your needs.

Migration Types: They are usually discussed as either schema (structure of the database) or data (the stored stuff – aka creamy filling). Sometimes they take place at the same time and sometimes not.

A Flask Migration Package Option: Alembic

Previous posts included how much I leveraged Flask Mega Tutorial for building a web application (app). In regards to migrations, Flask Mega primarily focuses on SLQLite which is not as helpful because Postgres is needed for Heroku deployment.

Alembic is a migration tool that is better maintained than the sqlalchemy-migrate package, and it is from SQLAlchemy’s author.  There is documentation on how to setup and run database migrations. I’ve listed out some of the main, basic steps needed to setup alembic, migrate revisions and run it all on Heroku below. This is assuming you’ve already created a database locally as well as added it on Heroku and promoted it as the default.

Where there is a $ or => the words following should be run in the command line and yes, these directions are based on Mac. 

How to Start

  1. Install alembic $ pip install alembic
  2. Add to requirements $ pip freeze > requirements.txt
  3. Initialize it inside your project root folder $ alembic init alembic
  4. Ignore the .ini file for this basic installation
  5. Change env.py with directions at this link.  If the app is in the Flask Mega structure then just replace with app but make sure your Config file is setup for postgres:

import os
if os.environ.get(‘DATABASE_URL’) is None:
SQLALCHEMY_DATABASE_URI = ‘postgresql://localhost/<db_name>’
else:
SQLALCHEMY_DATABASE_URI = os.environ[‘DATABASE_URL’]

  1. Create first revision $ alembic revision -m “First revision.”
  2. Find and add change scripts for upgrade and downgrade to the new revision file
  3. Migrate $ alembic upgrade head
  4. Repeat 6 – 8 for further local revisions and migrations
  5. Revise Procfile:

migrate: alembic upgrade head
upgrade: alembic upgrade +1
downgrade: alembic downgrade -1

  1. Git add all changes $ git add .
  2. Git commit $ git commit -m “Procfile and running revisions>”
  3. Push to Heroku $ git push heroku master
  4. Run alembic migrate on Heroku $ heroku run alembic upgrade head

If you get something like the following then it went well:

Running `alembic upgrade head` attached to terminal… up, run.****
INFO [alembic.migration] Context impl PostgresqlImpl.
INFO [alembic.migration] Will assume transactional DDL.
INFO [alembic.migration] Running upgrade None -> *********, Create account table
INFO [alembic.migration] Running upgrade ******** -> ********* Add zoomlevel to locations.
INFO [alembic.migration] Running upgrade ******** -> *********, Test add favorite.
INFO [alembic.migration] Running upgrade ******** -> *******, Test add favorite.

To double check changes went through on Heroku, here are a couple commands:

  1. Launch Postgres interactive environment on Heroku $ heroku pg:psql
  2. Look at tables => \dt
  3. Look at table schema => \d <table name>

That should cover it to get started. Creating a revision file and migrating locally as well as on Heroku are steps that should be repeated for each new migration.

As always there is more info out there for nuances and complexities to migration. There is also the autogenerate functionality that can automatically define change scripts for things like a schema change, but it is limited in what it can do. Check that out in the reference documents. No matter what, I recommend always taking a look at the revision file just to make sure it will do what you need. And have fun migrating.