Amazon Elastic MapReduce (EMR) is a managed cluster platform on Amazon Web Services (AWS) for big data processing and analysis. It provides a simplifier way to run big data frameworks such as Apache Hadoop and Apache Spark.

This post will focus on running Apache Spark on EMR, and will cover:

  • Create a cluster on Amazon EMR
  • Submit the Spark Job
  • Load/Store data from/to S3

Prerequisite

  1. A well developed Spark application
  2. Input files
  3. An AWS account
  4. An AWS S3 bucket to store input/output files, logs and Spark application JAR file

Before we create a cluster on EMR, the Spark application JAR and input files should be uploaded to S3 bucket.

Create a cluster on Amazon EMR

Cluster is the core component of EMR. A cluster is a collection of nodes, each node is an Amazon Elastic Compute Cloud (Amazon EC2) instance. Each node has a role within the cluster, master node, core node, and task node (referred to as the node type).

There are several ways to create a cluster on EMR. In this section, we are going to demostrate two of them: AWS CLI and SDK for Python (boto 3).

Create a cluster using the AWS CLI

The following create-cluster example creates a simple EMR cluster to run Spark.

1
2
3
4
5
6
$ aws emr create-cluster \
--name "simple cluster" \
--release-label emr-5.20.0 \
--instance-type m3.xlarge \
--instance-count 3 \
--applications Name=Spark
  • name: Name of the cluster. If not provided, the default is “Development Cluster”.
  • release-label: Amazon EMR release version.
  • instance-type: Type of Amazon EC2 instance to use in a cluster.
  • instance-count: Number of Amazon EC2 instances to create for a cluster. One instance is used for the master node, and the remainder are used for the core node type.
  • applications: Applications to install on the cluster.

For more examples and details about the options, read can read this documentation.

Create a cluster using the SDK for Python

In boto3, run_job_flow creates and starts running a new cluster (job flow).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import boto3
emr_client = boto3.client('emr', region_name='us-west-2')
emr_client.run_job_flow(
Name='simple cluster',
ReleaseLabel='emr-5.20.0',
Instances={
'InstanceGroups': [
{
'Name': 'Master - 1',
'InstanceRole': 'MASTER',
'InstanceType': 'm3.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core - 2',
'InstanceRole': 'CORE',
'InstanceType': 'm3.xlarge',
'InstanceCount': 2,
}
],
},
Applications=[
{
'Name': 'Spark'
}
]
)

Note that Name is a required parameter here, while --name is not required when you use the AWS CLI to create a cluster.

Submit the Spark Job

After a cluster is created, it is not ready for running jobs until you submit an actual Spark job. In EMR, these jobs are called steps. Each step is a unit of work that contains instructions to manipulate data for processing by software installed on the cluster. So, all you need to do next is to add a --steps option to the command above.

In a step, you have to define its type, name, and tell EMR where your JAR file is located and pass all the need parameters for your script.

Suppose we have a Spark application, the purpose of the application is to transform the file format from json to parquet. We have to pass the input location and the output location to the main class file.transform.Main.

For this Spark job, the inputs and outputs are:

  • Inputs: Some json files on S3 s3://my-bucket/inputs.
  • Outputs: The ouputs will be some parquet files, ane will be put to another S3 location s3://my-bucket/outputs.

Note that EMR meeds read and write permissions to this S3 bucket.

Now we have packaged the application into a JAR file, and uploaded it to S3 s3://my-bucket/jars/file-transform-script.jar.

Submit a step using the AWS CLI

You can use a shorthand syntax:

1
2
--steps Type=string,Name=string,ActionOnFailure=string,Jar=string, \
Args=string,string,MainClass=string,Properties=string ...

Or in JSON Syntax, a more clear way:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[
{
"Type": "CUSTOM_JAR",
"Name": "spark job step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--class", "file.transform.Main",
"s3://my-bucket/jars/file-transform-script.jar",
"s3://my-bucket/inputs",
"s3://my-bucket/outputs"
]
}
]

With step.json, you can create a cluster with one step with the following command:

1
2
3
4
5
6
7
$ aws emr create-cluster \
--name "simple cluster" \
--release-label emr-5.20.0 \
--instance-type m3.xlarge \
--instance-count 3 \
--applications Name=Spark \
--steps file://step.json

You can also add a step or more steps to an existing cluster:

1
2
3
$ aws emr add-steps \
--cluster-id j-XXXXXXX \
--steps file://step.json

The cluster id can be found on EMR console or by running $ aws emr list-clusters.

Submit a step using the SDK for Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def add_step():
step = [{
'Name': 'spark job step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class',
'file.transform.Main',
"s3://my-bucket/jars/file-transform-script.jar",
"s3://my-bucket/inputs",
"s3://my-bucket/outputs"
]
}
}]
return step
  • Name: The nama of the step.
  • ActionOnFailure: The action to take when a step fails. There are three possible values: TERMINATE_CLUSTER, CANCEL_AND_WAIT, and CONTINUE
  • HadoopJarStep: The JAR file used for the step.
    • Jar: A path to a JAR file run during the step.
    • Args: A list of command line arguments passed to the JAR file’s main function when executed.

With command-runner.jar you can execute many scripts or programs, and you do not have to know its full path. In the case above, spark-submit is the command to run.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
emr_client.run_job_flow(
Name='simple cluster',
ReleaseLabel='emr-5.20.0',
Instances={
'InstanceGroups': [
{
'Name': 'Master - 1',
'InstanceRole': 'MASTER',
'InstanceType': 'm3.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core - 2',
'InstanceRole': 'CORE',
'InstanceType': 'm3.xlarge',
'InstanceCount': 2,
}
],
},
Applications=[
{
'Name': 'Spark'
}
],
Steps=[add_step()]
)

Use add_job_flow_steps to add steps to an existing cluster:

1
2
3
4
emr_client.add_job_flow_steps(
JobFlowId=cluster_id,
Steps=[add_step()]
)

The job will consume all of the data in the input directory s3://my-bucket/inputs, and write the result to the output directory s3://my-bucket/outputs.

Above are the steps to run a Spark Job on Amazon EMR.

Reference