MGTA 495: Assignment Week 9

Word Count on Amazon EMR


Tasks:

Due: Tuesday 9th March 11:59 PM PST


Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html

1. Upload the 1M dataset to S3

To make the datasets available to the EMR cluster, we need to upload the data files to Amazon S3. Follow these steps to do so:

  1. In the Amazon console, open the Services menu on the top left and select S3
  2. Create a bucket if you don't have one yet. Use the default settings, but your bucket name must be unique.
  3. Create a folder in your bucket, e.g. data, using the default settings. (Don't upload the data file to the root of the bucket; we'll also use this bucket for later assignments, so it's good to keep everything organized.
  4. Enter the folder and upload the txt file. Do NOT upload the zip, as Spark won't know what to do with it.

You can use this dataset now. Read from it by copying the S3 URI, convert it to a dataframe and do anything else you want.

This exercise is only to help you understand how you can create your own S3 buckets and read data from it. The actual task for you is to read data (BookReviews_5M.txt) from a different S3 bucket and work on that dataset.

2. Copy the 5M dataset to HDFS

We'll only copy the 5M dataset to HDFS. We'll copy the data by adding a step while creating the cluster.

On Amazon EMR, this works differently than on the local container. We have already uploaded the 5M reviews data to the s3 bucket s3://rsm-emr01. Follow the steps below to copy the data into the HDFS of your EMR cluster:

  1. In AWS, go to Services -> EMR.
  2. Click on 'Create cluster'.
  3. Click on 'Go to advanced options'.
  4. Select the EMR version 6.2.0, add required software packages as shown in class.
  5. In the section which says 'Steps(optional)', select 'Custom JAR' and click on 'Add Step'.
  6. Enter Custom JAR as Step type, command-runner.jar as JAR location (remove the "s3://" prefix!), and s3-dist-cp --src=s3://rsm-emr01/data --dest=hdfs:///data as Arguments
  7. Let the 'Action on failure' remain as 'Continue' and click on Add.
  8. Specify the instance count for master and core nodes
  9. Give your cluster a name, select an EC2 keypair that you should have created earlier. If you have not created an EC2 keypair, stop here. Go back and create a keypair first, then come back to this step.
  10. Proceed to create a cluster and wait for completion. This will take a few minutes (typically ~5-8 minutes). The required data files are now under hdfs:///data/. We will use this path to read the BookReviews_5M.txt dataset: hdfs:///data/BookReviews_5M.txt and use this data to find the word counts.

Note that the Notebook container (this hosts the notebook and is separate from the EMR cluster) does not have an HDFS installation, which means that the usual hadoop commands are not available to you. You must check if the data is copied correctly by reading them in this notebook. Ignore this statement if this is confusing to you - you won't need this for this assignment at least.

3. Start Spark Session

Note that yo don't need to manually start the spark session. Amazon does it for you in the background, so that the spark session is started as soon as you import pyspark.

Remember that the kernel for running this Notebook is PySpark and not Python 3.

4. Examine the data

Your task:

  1. Examine the contents of the dataframe that you've just read from file.

Expected output:

  1. Print the schema of the raw dataframe, as well as its first 25 rows.

5. Clean the data

Your task:

  1. Remove all punctuations and convert all characters to lower case.

Expected output:

  1. The first 25 rows of a dataframe, with a column containing the cleaned sentences.

6. Get dataframe containing unique words and their counts

Your task:

  1. Split each sentence into words based on the delimiter space (' ').
  2. Put each word in each sentence row into their own rows. Put your results into a new dataframe.
  3. Print out the first 5 rows of the dataframe.

Expected output:

  1. First 5 rows of the output dataframe.

7. Sort the word count dataframe in a descending manner.

Your task:

  1. Sort the previous dataframe by the counts column in a descending manner. Put your results into a new dataframe.

Expected output:

  1. First 25 rows of the sorted word count dataframe. The first row would have the maximum count.

8. Record the execution time

Your task:

  1. Print the execution time.

Expected output: The execution time. No particular value is expected.

9. Save the sorted word counts directly to S3 as a CSV file

NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first.

Your task:

  1. Coalesce the previous dataframe to one partition. This makes sure that all our results will end up in the same CSV file.
  2. Save the 1-partition dataframe to S3 using the DataFrame.write.csv() method. Take note to store the file inside S3, at a place that you can remember. The save path should look something like s3://<your-bucket>/<your-folder>/<your-result-file>.csv. Change these parameters to point to your bucket and folder.
  3. Remember to save the csv file along with the header

Note:

You only need to run the section 9 and section 10 once for the 5M dataset.

Section 11 requires you to run multiple iterations of this Notebook, and for that you can comment out the code in section 9 so that it's easier for you to run.

10. Download the CSV file from S3 to your local machine and create the expected CSV output file

  1. Navigate to the S3 folder where you stored your output
  2. Note the name of this file, it should look something like part-00000-xx.....xx.csv.
  3. Click on this file, it should open the file properties.
  4. Beside 'Copy S3 URI', click on 'Object actions' and then click on 'Download'.
  5. After downloading the file, you can rename it to anthing, say results.csv.
  6. We want you to submit a CSV containing the first 100 rows of the results file. To do this, use the command head -n 100 results.csv > 100_rows.csv on a terminal. You can also do so manually, since CSV files are in plain text. Remember that we want the first 100 lines which would include the header as well - so basically it is header + 99 rows.

Note on Autograder

The autograder will check whether the results that you will submit in the 100_rows.csv file matches exactly with the expected results or not. The autograder will run on your submitted csv file and not on the notebook, so you are free to change the notebook in any way that you want.

The csv file would look something like this - The top words and their counts are shown for illustration - The actual words and their counts will differ in your output:

$\text{word,count}\\ \text{the,123}\\ \text{i,121}\\ \text{and,99}\\ \text{...97 more rows}\\ $

11. Execution times on different dataset and settings.

You need to experiment with using different number of master and worker nodes for running this whole Jupyter Notebook. You will have to report the execution time of this Notebook as you noted in an earlier section.

  1. Create a cluster with the required number of master and worker nodes.
  2. Then go to the Kernel tab in JupyterLab, and do 'Restart and run all cells.'
  3. You should note the time in the cell just before section 9 - this is the time that it took for all the code to run.
  4. Then, start a new cluster with a different configuration of master and worker nodes and dataset as expected. Run the Notebook again, and note the execution times.

Fill in the times in the table below.

Dataset #Master Nodes #Core Nodes Runtime_1 Runtime_2 Runtime_3 Mean Std
1M 1 1 44.94 44.98 44.87 44.93 0.056
5M 1 1 114.71 114.89 114.76 114.79 0.093
5M 1 3 49.48 49.52 49.58 148.58 0.050

12. Screenshots of terminated EMR clusters

You need to add a screenshot of your Amazon EMR 'Clusters' page which shows that all of your clusters have been terminated after you are done with your assignment.

To submit a screenshot, just follow the instructions on Gradescope for submitting it. There will be a separate submission for this screenshot.