Linear Regression on the Boston Housing dataset.
Submission
Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.
PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html
Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
from sklearn.utils import shuffle
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pickle
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
%matplotlib inline
# initialize Spark
import pyspark
import pyspark.sql.functions as f
from pyspark.sql import SparkSession, Row
conf = pyspark.SparkConf().setAll([('spark.master', 'local[2]'),
('spark.app.name', 'Linear Regression')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
assert(spark.version == '3.0.1')
assert(pyspark.version.__version__ == '3.0.1')
pwd
'/home/jovyan/Desktop/MGTA495/week8'
# Path of the Bostong_Housing_With_Headers.csv file
file_path = "Bostong_Housing_With_Headers.csv"
pdf = pd.read_csv(file_path)
df = spark.createDataFrame(pdf)
CRIM — per capita crime rate by town.
ZN — proportion of residential land zoned for lots over 25,000 sq.ft.
INDUS — proportion of non-retail business acres per town.
CHAS — Charles River dummy variable (= 1 if tract bounds river; 0 otherwise).
NOX — nitrogen oxides concentration (parts per 10 million).
RM — average number of rooms per dwelling.
AGE — proportion of owner-occupied units built prior to 1940.
DIS — weighted mean of distances to five Boston employment centres.
RAD — index of accessibility to radial highways.
TAX — full-value property-tax rate per $10,000.
PTRATIO — pupil-teacher ratio by town.
BLACK — 1000(Bk — 0.63)² where Bk is the proportion of blacks by town.
LSTAT — lower status of the population (percent).
MV — median value of owner-occupied homes in $1000s. This is the target variable.
# See what one row of the Spark dataframe looks like.
df.take(1)
[Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1, TAX=296.0, PTRATIO=15.3, BLACK=396.9, LSTAT=4.98, MV=24.0)]
# Helper function for filling columns using mean or median strategy
# You don't need to change anything in this cell.
from pyspark.ml.feature import Imputer
def fill_na(df, strategy):
imputer = Imputer(
strategy=strategy,
inputCols=df.columns,
outputCols=["{}_imputed".format(c) for c in df.columns]
)
new_df = imputer.fit(df).transform(df)
# Select the newly created columns with all filled values
new_df = new_df.select([c for c in new_df.columns if "imputed" in c])
for col in new_df.columns:
new_df = new_df.withColumnRenamed(col, col.split("_imputed")[0])
return new_df
# These are the column names in the csv file as described above.
col_names = ['CRIM' , 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'BLACK', 'LSTAT', 'MV']
feature_selection_df = df.select(col_names)
feature_selection_df.printSchema()
# Drop the NA's in the target variable - which is MV.
cleaned_df = df.na.drop(how='any', subset='MV')
# Fill the NA's using a mean strategy
cleaned_df = fill_na(cleaned_df, 'mean')
print('{} non-null rows'.format(cleaned_df.count()))
cleaned_df.show(2)
root |-- CRIM: double (nullable = true) |-- ZN: double (nullable = true) |-- INDUS: double (nullable = true) |-- CHAS: long (nullable = true) |-- NOX: double (nullable = true) |-- RM: double (nullable = true) |-- AGE: double (nullable = true) |-- DIS: double (nullable = true) |-- RAD: long (nullable = true) |-- TAX: double (nullable = true) |-- PTRATIO: double (nullable = true) |-- BLACK: double (nullable = true) |-- LSTAT: double (nullable = true) |-- MV: double (nullable = true) 506 non-null rows +-----+-----+----+----+-------+-------+------+---+-----+----+-----+----+-----+-----+ |INDUS| NOX| AGE|CHAS|PTRATIO| CRIM| DIS|RAD| RM| ZN| TAX| MV|BLACK|LSTAT| +-----+-----+----+----+-------+-------+------+---+-----+----+-----+----+-----+-----+ | 2.31|0.538|65.2| 0| 15.3|0.00632| 4.09| 1|6.575|18.0|296.0|24.0|396.9| 4.98| | 7.07|0.469|78.9| 0| 17.8|0.02731|4.9671| 2|6.421| 0.0|242.0|21.6|396.9| 9.14| +-----+-----+----+----+-------+-------+------+---+-----+----+-----+----+-----+-----+ only showing top 2 rows
# We'll use the VectorAssembler package to create the feature vector.
from pyspark.ml.feature import VectorAssembler
# Create a vector column composed of all the features - Don't include the label here since label isn't a feature
features = ['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','BLACK','LSTAT']
assembler = VectorAssembler(inputCols=features,
outputCol='featureVector')
features_DF = assembler.transform(df)
# After creating the dataframe, print the first 5 rows of that as the output of this cell
features_DF.show(5)
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+----+--------------------+ | CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| BLACK|LSTAT| MV| featureVector| +-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+----+--------------------+ |0.00632|18.0| 2.31| 0|0.538|6.575|65.2| 4.09| 1|296.0| 15.3| 396.9| 4.98|24.0|[0.00632,18.0,2.3...| |0.02731| 0.0| 7.07| 0|0.469|6.421|78.9|4.9671| 2|242.0| 17.8| 396.9| 9.14|21.6|[0.02731,0.0,7.07...| |0.02729| 0.0| 7.07| 0|0.469|7.185|61.1|4.9671| 2|242.0| 17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...| |0.03237| 0.0| 2.18| 0|0.458|6.998|45.8|6.0622| 3|222.0| 18.7|394.63| 2.94|33.4|[0.03237,0.0,2.18...| |0.06905| 0.0| 2.18| 0|0.458|7.147|54.2|6.0622| 3|222.0| 18.7| 396.9| 5.33|36.2|[0.06905,0.0,2.18...| +-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+----+--------------------+ only showing top 5 rows
# Split the dataframe using the randomSplit() function
# into a training dataframe and a test dataframe with a 75:25 split between them
# Use seed=42 as one the parameters of the randomSplit() function to maintain consistency among all submissions.
DF2 = features_DF.select('featureVector', 'MV')
train_DF, test_DF = DF2.randomSplit([0.75, 0.25], seed=42)
print("Train set size:",str(train_DF.count()) , "Test set size:", str(test_DF.count()))
Train set size: 393 Test set size: 113
# Use the StandardScaler to standardize your data.
from pyspark.ml.feature import StandardScaler
# Pass required parameters and scale with mean and std both. - Scale training data.
scaler = StandardScaler(withStd=True, withMean=True, inputCol='featureVector', outputCol='scaled')
scalerModel = scaler.fit(train_DF)
# Scale your training and test data with the same mean and std that you'll get from the scaler.
# Hint: scalerModel.mean, scalerModel.std - You can use this to see the mean and std for each feature.
train_DF = scalerModel.transform(train_DF)
test_DF = scalerModel.transform(test_DF)
# See a summary of your training data
train_DF.describe().show()
+-------+------------------+ |summary| MV| +-------+------------------+ | count| 393| | mean|22.406615776081424| | stddev| 9.167890227412919| | min| 5.0| | max| 50.0| +-------+------------------+
# Use LinearRegression for training a regression model.
# Use maxIter = 100.
# Use the following values for regParam and elasticNetParam and see which one works better.
# 1. regParam = 0, elasticNetParam = 0
# 2. regParam = 0.3, elasticNetParam = 0.5
# Look into the API specification to get more details.
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
featuresCol='scaled',
labelCol='MV',
maxIter=100,
regParam=0,
elasticNetParam=0
)
lr_model = lr.fit(train_DF)
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
RMSE: 4.801676 r2: 0.724987
# Testing the model on test data.
# Print RMSE and r2 values at the end on the test data.
# YOUR CODE HERE
test_result = lr_model.evaluate(test_DF)
print("RMSE: %f" % test_result.rootMeanSquaredError)
print("r2: %f" % test_result.r2)
RMSE: 4.300042 r2: 0.785494
# Plotting using matplotlib
# In the test data, you have labels, and you also have predictions for each of the test data.
# Plot a scatter plot of the labels and predictions on a single plot so that you can visualize how the predictions\
# look as compared to the ground truth.
test_DF = lr_model.transform(test_DF)
x = test_DF.select('prediction').toPandas()
y = test_DF.select('MV').toPandas()
plt.scatter(x, y)
plt.show()
# Tuning the elastic net parameter
# Use different values of elasticNetParam: [0, 0.1, 0.2, 0.3, 0.4, ... 1.0] and figure out which one is the best.
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression(featuresCol='featureVector', labelCol='MV', regParam = 0.3 ,maxIter=100)
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0, 0.1, 0.2, 0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]).build()
crossval = CrossValidator(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(labelCol='MV'),
numFolds=2)
cvModel = crossval.fit(train_DF)
best_model = cvModel.bestModel
best_model._java_obj.getElasticNetParam()
print("Best elasticNetParam = ", best_model._java_obj.getElasticNetParam(),",regParam = 0.3" )
Best elasticNetParam = 0.0 ,regParam = 0.3
# Stop the spark session
spark.stop()