Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.
PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
Spark works best with the Hadoop File System (HDFS). Here are the steps you need to start up HDFS in this container and copy files between the local file system and HDFS:
init-dfs.sh
, start-dfs.sh
, stop-dfs.sh
. Copy these scripts from Canvas to your local folder.cd
to navigate to the directory where you copied the scripts. You can do this either using a terminal from JupyterLab or from the ZSH terminal on the container.bash ./init-dfs.sh
to initialize the HDFS service. If prompted, answer y
to erase HDFS. bash ./start-dfs.sh
to start the HDFS service. When this is complete, run jps
to confirm that you can see NameNode
, DataNode
and SecondaryNameNode
running. cd
to navigate to the directory where you have the downloaded data file. If running on the RSM server, copy the file to the container from the distribution folder. hadoop fs -mkdir /W7
- This will create a directory named W7 in the root directory of your HDFS file system.hadoop fs -copyFromLocal <your-data-file> /W7
to copy the datafile to the W7 folder of the Hadoop File System that you just created. You may run hadoop fs -help
to see more about how to manually navigate and use HDFS. After you're done, run hadoop fs -ls /
to list the files and check that the file was copied.Expected output: None
Expected Output: None
# initialize Spark
import pyspark
from pyspark.sql import SparkSession, Row
conf = pyspark.SparkConf().setAll([('spark.master', 'local[4]'),
('spark.app.name', 'Word Count')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Verify that the correct versions of spark and pyspark are installed.
# print (spark.version, pyspark.version.__version__)
assert(spark.version == '3.0.1')
assert(pyspark.version.__version__ == '3.0.1')
# record the starting time of execution for timing this notebook
import time
start_time = time.time()
Read data from the BookReviews_1M.txt file. You can find this file on Google Drive - the link for which is shared on Piazza, which you will need to copy to your HDFS file system using the command mentioned in section 1.
Expected output: None
# Read data from HDFS
dataFileName = "hdfs:////W7/BookReviews_1M.txt"
# Read data from the file above, convert it to a dataframe.
df = spark.read.text(dataFileName) \
.cache()
Your task:
Expected output:
Your output would look like this:
# We provide the following function for building a column expression for Task 1.
# Do not change this cell.
# NOTE: Counterintuitively, column objects do NOT store any data; instead they store column expressions (transformations).
# The below function takes in a column object, and adds more expressions to it to make a more complex transformation.
# Once we have a column object representing the expressions we want, use DataFrame.select(column) to apply the expressions
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces."""
return trim(lower(regexp_replace(column, "[^A-Za-z0-9 ]", ""))).alias("sentence")
# Recommended: take a look at the contents of a column object returned from removePunctuations. What's in there?
print(removePunctuation(df.value))
Column<b'trim(lower(regexp_replace(value, [^A-Za-z0-9 ], ))) AS `sentence`'>
# execute the column expressions generated by removePunctuation() to clean the sentences
# After that, use the show() function to print the first 25 rows of the dataframe
# Hint: you'll need the Column object returned by removePunctuations().
df_rmv_punc = df.select(removePunctuation(df.value))
df_rmv_punc.show(25)
+--------------------+ | sentence| +--------------------+ |this was the firs...| |also after going ...| |as with all of ms...| |ive not read any ...| |this romance nove...| |carolina garcia a...| |not only can she ...| |once again garcia...| |the timing is jus...| |engaging dark rea...| |set amid the back...| |this novel is a d...| |if readers are ad...| | reviewed by phyllis| | apooo bookclub| |a guilty pleasure...| |in the tradition ...| |beryl unger top e...| |what follows is a...| |the book flap say...| |id never before r...| |the novels narrat...| |it is centered on...| |if you like moder...| |beryl unger is a ...| +--------------------+ only showing top 25 rows
Your task:
Expected output:
# We assemble the 'split' and 'explode' column expressions, then apply them to the sentence column
from pyspark.sql.functions import split, explode
# YOUR CODE HERE for printing the first 5 rows of the dataframe after the required operations
df1 = df_rmv_punc.select(split("sentence", " ").alias('csv'))
df2 = df1.select(explode('csv').alias('word'))
df2.show(5)
+-----+ | word| +-----+ | this| | was| | the| |first| | time| +-----+ only showing top 5 rows
The output after filtering empty rows would be:
# Let's filter out all empty rows in the dataframe.
from pyspark.sql.functions import length
# Hint: You may use the length() method provided to select rows where sentence length is greater than 1
df3 = df2.filter(length("word") > 0)
df3.show(25)
+--------------+ | word| +--------------+ | this| | was| | the| | first| | time| | i| | read| |garciaaguilera| | i| | came| | upon| | the| | name| | of| | this| | book| | on| | live| | with| | regis| | and| | kelly| | this| | book| | was| +--------------+ only showing top 25 rows
## Group the dataframe by unique words, then count each group
df4 = df3.groupBy('word').count()
# Hint: how do you group rows in a DataFrame?
df4.show(25)
# YOUR CODE HERE
+-----------+-----+ | word|count| +-----------+-----+ | still|52574| | hope| 6729| | some|74982| | those|22067| | few|33375| | degrade| 343| | bookshelf| 900| | amazonings| 1| | recognize| 2008| | inner| 819| | harder| 1441| | lyrical| 14| | viewpoint| 37| |handicapped| 51| | spoil| 84| | historys| 2| | everyday| 2493| | meursault| 1| | art| 1291| | involving| 142| | connected| 9172| | spared| 43| | doubts| 320| | 1970s| 175| | brands| 4228| +-----------+-----+ only showing top 25 rows
Your task:
Expected output:
# Sort the dataframe by the 'count' column
wordCountsSortedDF = df4.sort("count",ascending=False)
# Hint: the DataFrame.count() function collides with the counts column we want to use.
# How else can we specify the column to sort by?
# Uncomment the next two lines and fill your code
# wordCountsSortedDF = <Your code>
# wordCountsSortedDF.show(25)
wordCountsSortedDF.show(25)
+-----+-------+ | word| count| +-----+-------+ | the|2053274| | i|1228198| | and|1079515| | to|1070092| | a|1026310| | it| 850403| | is| 633026| | for| 574222| | of| 568148| | this| 552837| | my| 446637| | in| 420544| | with| 398539| | that| 387286| | you| 359814| | on| 337447| | have| 322965| | but| 293731| | not| 279774| | was| 259645| | as| 234367| | are| 217605| |great| 195349| | so| 188233| | they| 175861| +-----+-------+ only showing top 25 rows
Your task:
Expected output: The execution time. No particular value is expected.
# print the time since execution start - This will be needed in section 10.
print(time.time() - start_time)
34.142656087875366
NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first.
Your task:
hdfs:///<your-result-file>.csv
, for convenience. Expected output: None
# Save results to HDFS
wordCountsSortedDF.coalesce(1).write.csv("hdfs:///wordCountsSorted.csv", header=True, mode="overwrite")
# Stop Spark session
spark.stop()
Now that we have our results stored in HDFS, we need to copy it back to the local file system to access it. This process may sound cumbersome, but it is a necessary result of Spark and Hadoop's distributed architecture, and their ability to scale up to arbitrarily large datasets and computing operations.
Copying the results from HDFS to the local file system is fairly simple. Here are the steps:
hadoop fs -ls /
to list the root directory of the HDFS. You should see the CSV file that you have saved. Counterintuitively, this CSV file is actually a folder, which contains individually saved files from each partition of the saved dataframe (see above for data partitioning). hadoop fs -ls /<your-result-file>.csv/
to see what's inside the saved folder. Since we made sure to coalesce our dataframe to just one partition, we should expect to find only one saved partition in this folder, saved also as a CSV. Note the name of this file, it should look something like part-00000-xx.....xx.csv
. hadoop fs -copyToLocal /<your-result-file>.csv/part-00000-xx.....xx.csv
to copy the results CSV from HDFS to the current folder on your local file system. You may rename this file to something more interpretable - let's say results.csv
. bash ./stop-dfs.sh
from the directory where you have the scripts.head -n 100 results.csv > 100_rows.csv
. You can also do so manually, since CSV files are in plain text. Remember that we want the first 100 lines which would include the header as well - so basically it is header + 99 rows.6. Remember that you will need to submit this CSV file, a PDF of this Jupyter Notebook, and the .py file which you can generate by converting this Notebook to a .py file.
Expected Output: None
You need to experiment with using different number of cores for running this whole Jupyter Notebook.
After writing all of the expected code before this cell, you should set the cofiguration at the beginning of this Notebook in the cell where this code is present:
conf = pyspark.SparkConf().setAll([('spark.master', 'local[1]'),
('spark.app.name', 'Word Count')])
Use the following values for number of cores - 1, 2, and 4.
Then go to the Kernel tab in JupyterLab, and do 'Restart and run all cells.' You should note the time in the cell just before section 8 - this is the time that it took for all the code to run.
Fill in the times in the table below.
#Cores | Runtime_1 | Runtime_2 | Runtime_3 | Mean | Std |
---|---|---|---|---|---|
1 | 90.33 | 90.61 | 90.71 | 90.55 | 0.197 |
2 | 44.35 | 44.65 | 44.67 | 44.56 | 0.179 |
4 | 34.14 | 34.62 | 34.34 | 34.37 | 0.241 |
The autograder will check whether the results that you will submit in the 100_rows.csv file matches exactly with the expected results or not. The autograder will run on your submitted csv file and not on the notebook, so you are free to change the notebook in any way that you want.
The csv file would look something like this - The counts are shown for illustration - Your counts will differ:
$\text{word,count}\\ \text{the,123}\\ \text{i,121}\\ \text{and,99}\\ \text{...97 more rows}\\ $