Calculate average and standard deviation of execution times over 3 runs for these three settings:
BookReviews_5M - 1 master + 3 worker nodes
Note that worker nodes are also called core nodes when initializing them on AWS.
Submit (4 files)
Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.
PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
To make the datasets available to the EMR cluster, we need to upload the data files to Amazon S3. Follow these steps to do so:
data
, using the default settings. (Don't upload the data file to the root of the bucket; we'll also use this bucket for later assignments, so it's good to keep everything organized. You can use this dataset now. Read from it by copying the S3 URI, convert it to a dataframe and do anything else you want.
This exercise is only to help you understand how you can create your own S3 buckets and read data from it. The actual task for you is to read data (BookReviews_5M.txt) from a different S3 bucket and work on that dataset.
We'll only copy the 5M dataset to HDFS. We'll copy the data by adding a step while creating the cluster.
On Amazon EMR, this works differently than on the local container. We have already uploaded the 5M reviews data to the s3 bucket s3://rsm-emr01
. Follow the steps below to copy the data into the HDFS of your EMR cluster:
Custom JAR
as Step type, command-runner.jar
as JAR location (remove the "s3://" prefix!), and s3-dist-cp --src=s3://rsm-emr01/data --dest=hdfs:///data
as Argumentshdfs:///data/
. We will use this path to read the BookReviews_5M.txt
dataset: hdfs:///data/BookReviews_5M.txt
and use this data to find the word counts.Note that the Notebook container (this hosts the notebook and is separate from the EMR cluster) does not have an HDFS installation, which means that the usual hadoop commands are not available to you. You must check if the data is copied correctly by reading them in this notebook. Ignore this statement if this is confusing to you - you won't need this for this assignment at least.
Note that yo don't need to manually start the spark session. Amazon does it for you in the background, so that the spark session is started as soon as you import pyspark.
Remember that the kernel for running this Notebook is PySpark and not Python 3.
# Initialize Spark
import pyspark
from pyspark.sql import SparkSession, Row
print (spark.version, pyspark.version.__version__)
# Record the starting time of execution for timing this notebook
import time
start_time = time.time()
# Read data from HDFS or S3 - For the purposes of this assignment, you should read data from HDFS
# Although you can read directly from S3 theoretically.
# Provide the HDFS file path of the 5M dataset.
dataFileName = 'hdfs:///data/BookReviews_5M.txt'
#dataFileName='s3://week9-mgta495/data/week9-data/BookReviews_1M.txt'
# Read data from the above file path and convert it to a dataframe.
textDF = spark.read.text(dataFileName)
Your task:
Expected output:
# YOUR CODE HERE to print the schema
textDF.printSchema()
# YOUR CODE HERE to print the first 25 rows of the dataframe
textDF.show(25)
root |-- value: string (nullable = true) +--------------------+ | value| +--------------------+ |This was the firs...| |Also after going ...| |As with all of Ms...| |I've not read any...| |This romance nove...| |Carolina Garcia A...| |Not only can she ...| |Once again Garcia...| |The timing is jus...| |Engaging. Dark. R...| |Set amid the back...| |This novel is a d...| |If readers are ad...| | Reviewed by Phyllis| | APOOO BookClub| |A guilty pleasure...| |In the tradition ...| |Beryl Unger, top ...| |What follows is a...| |The book flap say...| |I'd never before ...| |The novel's narra...| |It is centered on...| |If you like moder...| |Beryl Unger is a ...| +--------------------+ only showing top 25 rows
Your task:
Expected output:
# Do not change this cell.
# NOTE: Counterintuitively, column objects do NOT store any data; instead they store column expressions (transformations).
# The below function takes in a column object, and adds more expressions to it to make a more complex transformation.
# Once we have a column object representing the expressions we want, use DataFrame.select(column) to apply the expressions
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces."""
return trim(lower(regexp_replace(column, "[^A-Za-z0-9 ]", ""))).alias("sentence")
# Recommended: take a look at the contents of a column object returned from removePunctuations. What's in there?
# No answers or outputs required for this cell.
print(removePunctuation(textDF.value))
Column<b'trim(lower(regexp_replace(value, [^A-Za-z0-9 ], ))) AS `sentence`'>
# execute the column expressions generated by removePunctuation() to clean the sentences
# After that, use the show() function to print the first 25 rows of the dataframe
# Hint: you'll need the Column object returned by removePunctuations().
# YOUR CODE HERE for printing the expected output.
rmv_punc_DF = textDF.select(removePunctuation(textDF.value))
rmv_punc_DF.show(25)
+--------------------+ | sentence| +--------------------+ |this was the firs...| |also after going ...| |as with all of ms...| |ive not read any ...| |this romance nove...| |carolina garcia a...| |not only can she ...| |once again garcia...| |the timing is jus...| |engaging dark rea...| |set amid the back...| |this novel is a d...| |if readers are ad...| | reviewed by phyllis| | apooo bookclub| |a guilty pleasure...| |in the tradition ...| |beryl unger top e...| |what follows is a...| |the book flap say...| |id never before r...| |the novels narrat...| |it is centered on...| |if you like moder...| |beryl unger is a ...| +--------------------+ only showing top 25 rows
Your task:
Expected output:
# We assemble the 'split' and 'explode' column expressions, then apply them to the sentence column
from pyspark.sql.functions import split, explode
# YOUR CODE HERE for printing the first 5 rows of the dataframe after the required operations
DF1 = rmv_punc_DF.select(split("sentence"," ").alias("csv"))
DF2 = DF1.select(explode("csv").alias("word"))
DF2.show(5)
+-----+ | word| +-----+ | this| | was| | the| |first| | time| +-----+ only showing top 5 rows
# Let's filter out all empty rows in the dataframe.
# Hint: You may use the length() method provided to select rows where sentence length is greater than 0
from pyspark.sql.functions import length
#YOUR CODE HERE
DF3 = DF2.filter(length(DF2.word) >= 1)
DF3.show(25)
+--------------+ | word| +--------------+ | this| | was| | the| | first| | time| | i| | read| |garciaaguilera| | i| | came| | upon| | the| | name| | of| | this| | book| | on| | live| | with| | regis| | and| | kelly| | this| | book| | was| +--------------+ only showing top 25 rows
# Group the dataframe by unique words, then count each group
# Hint: how do you group rows in a DataFrame?
# YOUR CODE HERE
DF4 = DF3.groupby('word').count()
DF4.show(25)
+--------------------+-----+ | word|count| +--------------------+-----+ | online|40394| | brands|19389| | squealing| 138| | input|33909| | priority| 2840| | flashed| 806| | hope|33903| | 12months| 16| | everyday|12140| | embrace| 128| | filing| 538| | userinterface| 90| | purchasesupgrades| 1| | bebut| 24| | gazebos| 2| | outfit| 543| | invisable| 21| | techsavy| 59| | elevate| 409| | comping| 6| | viewpoint| 129| | spared| 162| | tiltonly| 14| | batteryit| 29| |overtheshouldersa...| 1| +--------------------+-----+ only showing top 25 rows
Your task:
Expected output:
# Sort the dataframe by the 'count' column
# Hint: the DataFrame.count() function collides with the counts column we want to use.
# How else can we specify the column to sort by?
# Uncomment the next two lines and fill your code
wordCountsSortedDF = DF4.sort("count",ascending=False)
wordCountsSortedDF.show(25)
+-----+--------+ | word| count| +-----+--------+ | the|10642903| | i| 6326216| | to| 5607568| | and| 5537690| | a| 5166838| | it| 4654902| | is| 3242588| | for| 2860227| | this| 2845219| | of| 2782166| | my| 2319813| | in| 2147373| | with| 2046990| | that| 1983044| | on| 1758801| | you| 1754054| | have| 1632887| | but| 1508591| | not| 1460730| | was| 1434985| | as| 1185866| | are| 1007811| | so| 994529| |great| 988223| | very| 893737| +-----+--------+ only showing top 25 rows
Your task:
Expected output: The execution time. No particular value is expected.
# Print the time since execution start - You will need this value later.
print(time.time() - start_time)
51.43699026107788
NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first.
Your task:
s3://<your-bucket>/<your-folder>/<your-result-file>.csv
. Change these parameters to point to your bucket and folder.# Save results to S3
wordCountsSortedDF.coalesce(1).write.csv("s3://week9-mgta495/data/week9-data/result.csv", header=True, mode="overwrite")
# Stop Spark session
spark.stop()
part-00000-xx.....xx.csv
. results.csv
. head -n 100 results.csv > 100_rows.csv
on a terminal. You can also do so manually, since CSV files are in plain text. Remember that we want the first 100 lines which would include the header as well - so basically it is header + 99 rows.The autograder will check whether the results that you will submit in the 100_rows.csv file matches exactly with the expected results or not. The autograder will run on your submitted csv file and not on the notebook, so you are free to change the notebook in any way that you want.
The csv file would look something like this - The top words and their counts are shown for illustration - The actual words and their counts will differ in your output:
$\text{word,count}\\ \text{the,123}\\ \text{i,121}\\ \text{and,99}\\ \text{...97 more rows}\\ $
You need to experiment with using different number of master and worker nodes for running this whole Jupyter Notebook. You will have to report the execution time of this Notebook as you noted in an earlier section.
Fill in the times in the table below.
Dataset | #Master Nodes | #Core Nodes | Runtime_1 | Runtime_2 | Runtime_3 | Mean | Std |
---|---|---|---|---|---|---|---|
1M | 1 | 1 | 44.94 | 44.98 | 44.87 | 44.93 | 0.056 |
5M | 1 | 1 | 114.71 | 114.89 | 114.76 | 114.79 | 0.093 |
5M | 1 | 3 | 49.48 | 49.52 | 49.58 | 148.58 | 0.050 |
You need to add a screenshot of your Amazon EMR 'Clusters' page which shows that all of your clusters have been terminated after you are done with your assignment.
To submit a screenshot, just follow the instructions on Gradescope for submitting it. There will be a separate submission for this screenshot.