MGTA 495: Analytics Assignment 2 - Week 7 - Word Count

Setup of PySpark

Tasks:

Due date: 23rd February 11:59 PM PST


Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html

Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html

1. Copy data file(s) to HDFS

Spark works best with the Hadoop File System (HDFS). Here are the steps you need to start up HDFS in this container and copy files between the local file system and HDFS:

  1. Make sure you have the 3 scripts you'll need to start/stop the HDFS service: init-dfs.sh, start-dfs.sh, stop-dfs.sh. Copy these scripts from Canvas to your local folder.
  2. Open a terminal tab by going to File->New->Terminal. Use cd to navigate to the directory where you copied the scripts. You can do this either using a terminal from JupyterLab or from the ZSH terminal on the container.
  3. Run bash ./init-dfs.sh to initialize the HDFS service. If prompted, answer y to erase HDFS.
  4. Run bash ./start-dfs.sh to start the HDFS service. When this is complete, run jps to confirm that you can see NameNode, DataNode and SecondaryNameNode running.
  5. If running a local container, use cd to navigate to the directory where you have the downloaded data file. If running on the RSM server, copy the file to the container from the distribution folder.
  6. Run hadoop fs -mkdir /W7 - This will create a directory named W7 in the root directory of your HDFS file system.
  7. Now run hadoop fs -copyFromLocal <your-data-file> /W7 to copy the datafile to the W7 folder of the Hadoop File System that you just created. You may run hadoop fs -help to see more about how to manually navigate and use HDFS. After you're done, run hadoop fs -ls / to list the files and check that the file was copied.

Expected output: None

2. Start Spark Session

Expected Output: None

3. Load Data

Read data from the BookReviews_1M.txt file. You can find this file on Google Drive - the link for which is shared on Piazza, which you will need to copy to your HDFS file system using the command mentioned in section 1.

Expected output: None

4. Clean the data

Your task:

  1. Remove all punctuations and convert all characters to lower case.

Expected output:

  1. The first 25 rows of a dataframe, with a column containing the cleaned sentences.

Your output would look like this:

+--------------------+
| sentence|
+--------------------+
|this was the firs...|
|also after going ...|
|as with all of ms...|
|ive not read any ...|
|this romance nove...|
|carolina garcia a...|
|not only can she ...|
|once again garcia...|
|the timing is jus...|
|engaging dark rea...|
|set amid the back...|
|this novel is a d...|
|if readers are ad...|
| reviewed by phyllis|
| apooo bookclub|
|a guilty pleasure...|
|in the tradition ...|
|beryl unger top e...|
|what follows is a...|
|the book flap say...|
|id never before r...|
|the novels narrat...|
|it is centered on...|
|if you like moder...|
|beryl unger is a ...|
+--------------------+
only showing top 25 rows

5. Get dataframe containing unique words and their counts

Your task:

  1. Split each sentence into words based on the delimiter space (' ').
  2. Put each word in each sentence row into their own rows. Put your results into a new dataframe.
  3. Print out the first 5 rows of the dataframe.

Expected output:

  1. Show first 5 rows of the output dataframe which would be:
+-----+
| word|
+-----+
| this|
| was|
| the|
|first|
| time|
+-----+
only showing top 5 rows

Task:

  1. Remove all empty rows in the dataframe. These might have crept in because of the empty lines or words in the file.
  2. Group rows in the previous dataframe by unique words, then count the rows in each group. Put your results into a new dataframe.

Expected output:

  1. Show the first 25 rows of the dataframe, where each row contains only one word. The dataframe must not contain empty rows.
  2. Show 25 rows of the dataframe containing unique words and their counts

The output after filtering empty rows would be:

+--------------+
| word|
+--------------+
| this|
| was|
| the|
| first|
| time|
| i|
| read|
|garciaaguilera|
| i|
| came|
| upon|
| the|
| name|
| of|
| this|
| book|
| on|
| live|
| with|
| regis|
| and|
| kelly|
| this|
| book|
| was|
+--------------+
only showing top 25 rows


The output after grouping unique words would be:
+-----------+-----+
| word|count|
+-----------+-----+
| still|52574|
| hope| 6729|
| some|74982|
| those|22067|
| few|33375|
| degrade| 343|
| bookshelf| 900|
| amazonings| 1|
| recognize| 2008|
| inner| 819|
| harder| 1441|
| lyrical| 14|
| viewpoint| 37|
|handicapped| 51|
| spoil| 84|
| historys| 2|
| everyday| 2493|
| meursault| 1|
| art| 1291|
| involving| 142|
| connected| 9172|
| spared| 43|
| doubts| 320|
| 1970s| 175|
| brands| 4228|
+-----------+-----+
only showing top 25 rows

6. Sort the word count dataframe

Your task:

  1. Sort the previous dataframe by the counts column. Put your results into a new dataframe.

Expected output:

  1. First 25 rows of the sorted word count dataframe.

7. Record the execution time

Your task:

  1. Print the execution time.

Expected output: The execution time. No particular value is expected.

8. Save the sorted word counts to HDFS as a CSV file

NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first.

Your task:

  1. Coalesce the previous dataframe to one partition. This makes sure that all our results will end up in the same CSV file.
  2. Save the 1-partition dataframe to HDFS using the DataFrame.write.csv() method. Take note to store the file inside HDFS, at a place that you can remember. We recommend saving to the root directory of the HDFS, i.e. hdfs:///<your-result-file>.csv, for convenience.

Expected output: None

9. Copy the results from HDFS to the local file system

Now that we have our results stored in HDFS, we need to copy it back to the local file system to access it. This process may sound cumbersome, but it is a necessary result of Spark and Hadoop's distributed architecture, and their ability to scale up to arbitrarily large datasets and computing operations.

Copying the results from HDFS to the local file system is fairly simple. Here are the steps:

  1. Run hadoop fs -ls / to list the root directory of the HDFS. You should see the CSV file that you have saved. Counterintuitively, this CSV file is actually a folder, which contains individually saved files from each partition of the saved dataframe (see above for data partitioning).
  2. Run hadoop fs -ls /<your-result-file>.csv/ to see what's inside the saved folder. Since we made sure to coalesce our dataframe to just one partition, we should expect to find only one saved partition in this folder, saved also as a CSV. Note the name of this file, it should look something like part-00000-xx.....xx.csv.
  3. Run hadoop fs -copyToLocal /<your-result-file>.csv/part-00000-xx.....xx.csv to copy the results CSV from HDFS to the current folder on your local file system. You may rename this file to something more interpretable - let's say results.csv.
  4. If you're done, remember to stop the HDFS service by running bash ./stop-dfs.sh from the directory where you have the scripts.
  5. We want you to submit a CSV containing the first 100 rows of the results file. To do this, use the command head -n 100 results.csv > 100_rows.csv. You can also do so manually, since CSV files are in plain text. Remember that we want the first 100 lines which would include the header as well - so basically it is header + 99 rows.

6. Remember that you will need to submit this CSV file, a PDF of this Jupyter Notebook, and the .py file which you can generate by converting this Notebook to a .py file.

Expected Output: None

10. Execution times on different number of cores

You need to experiment with using different number of cores for running this whole Jupyter Notebook.

After writing all of the expected code before this cell, you should set the cofiguration at the beginning of this Notebook in the cell where this code is present: conf = pyspark.SparkConf().setAll([('spark.master', 'local[1]'), ('spark.app.name', 'Word Count')])

Use the following values for number of cores - 1, 2, and 4.

Then go to the Kernel tab in JupyterLab, and do 'Restart and run all cells.' You should note the time in the cell just before section 8 - this is the time that it took for all the code to run.

Fill in the times in the table below.

#Cores Runtime_1 Runtime_2 Runtime_3 Mean Std
1 90.33 90.61 90.71 90.55 0.197
2 44.35 44.65 44.67 44.56 0.179
4 34.14 34.62 34.34 34.37 0.241

Note on Autograder

The autograder will check whether the results that you will submit in the 100_rows.csv file matches exactly with the expected results or not. The autograder will run on your submitted csv file and not on the notebook, so you are free to change the notebook in any way that you want.

The csv file would look something like this - The counts are shown for illustration - Your counts will differ:

$\text{word,count}\\ \text{the,123}\\ \text{i,121}\\ \text{and,99}\\ \text{...97 more rows}\\ $