from pyspark.sql import SparkSession
import pandas as pd
import yaml
import os
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
The second part of this analysis will include EDA, feature selection, and finally creating some subset data to push to Tableau for building a dashboard. This will show comparisons between overall and Quentin Tarantino. Simple EDA charts and feature selection with the use of machine learning to explore and extract key information from the data.
Load Libraries
Create Spark Session and Create SQL Table
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import os
try:
# Initialize Spark session
print("Initializing Spark session...")
= SparkSession.builder \
spark "movies_db") \
.appName("spark.jars", "/usr/local/spark/jars/postgresql-42.7.4.jar") \
.config("spark.driver.extraClassPath", "/usr/local/spark/jars/postgresql-42.7.4.jar") \
.config("spark.executor.extraClassPath", "/usr/local/spark/jars/postgresql-42.7.4.jar") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer") \
.config(
.getOrCreate()print("Spark session created successfully.")
# Path within the Docker container
= "./imbd_movies.csv"
csv_file
# Check if the CSV file exists
if not os.path.exists(csv_file):
raise FileNotFoundError(f"CSV file not found at path: {csv_file}")
# Read CSV file into DataFrame
print(f"Attempting to read CSV file from path: {csv_file}")
= spark.read.csv(csv_file, header=True, inferSchema=True)
df print("CSV file read successfully.")
# Create a temporary view for SQL queries
"movies")
df.createOrReplaceTempView(print("Temporary view 'movies' created successfully.")
except FileNotFoundError as e:
print("File error:", e)
except AnalysisException as e:
print("DataFrame operation error:", e)
except Exception as e:
print("An unexpected error occurred:", e)
Initializing Spark session...
Spark session created successfully.
Attempting to read CSV file from path: ./imbd_movies.csv
CSV file read successfully.
Temporary view 'movies' created successfully.
Here shows the connection to the spark container. The IMBD Movies dataset is inside the container. This was mounted by the inital docker run commmand to use my local directory inside the container. Doing it this way, makes the process easier than moving the dataset inside.
TOP 10 Rows
= spark.sql("""
movies SELECT *
FROM movies
LIMIT 10
""")
movies.toPandas()
Best Picture | Certificate (GB) | Certificate (US) | Color | Contains Genre? | Contains Production Company? | Continent | Country | Genres (1st) | Genres (2nd) | ... | Title Id | What did they do ? | Who did they play ? | Year of Release | Billing (position in cast list) | IMDB Rating | Number of people | Number of titles | Number Of Votes | Runtime (Minutes) | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | None | None | None | None | None | None | None | None | None | None | ... | tt0193499 | actor | None | 1952 | 1 | None | 1 | 1 | None | None |
1 | None | None | None | None | None | False | None | None | None | None | ... | tt0193293 | actor | Lam Tsi-King | 1954 | 1 | None | 1 | 1 | None | None |
2 | None | None | None | None | None | False | None | None | None | None | ... | tt0245957 | actor | None | 1961 | 1 | None | 1 | 1 | None | None |
3 | None | None | None | None | None | None | None | None | None | None | ... | tt0247371 | actor | Sculptor Martin | 1965 | 1 | None | 1 | 1 | None | None |
4 | None | None | None | None | None | None | None | None | None | None | ... | tt0498914 | actor | Jagga | 1992 | 1 | None | 1 | 1 | None | 132 |
5 | None | None | None | None | None | None | None | None | None | None | ... | tt6448596 | actor | Sakir | 2007 | 1 | 1.8 | 1 | 1 | 61 | 95 |
6 | None | None | None | None | None | None | None | None | None | None | ... | tt0205040 | actor | Ng Chi Keung | 1952 | 2 | None | 1 | 1 | None | None |
7 | None | None | None | None | None | None | None | None | None | None | ... | tt0192202 | actor | None | 1953 | 2 | None | 1 | 1 | None | None |
8 | None | None | None | None | None | False | None | None | None | None | ... | tt0192049 | actor | None | 1953 | 2 | None | 1 | 1 | None | None |
9 | None | None | None | None | None | False | None | None | None | None | ... | tt0193153 | actor | None | 1958 | 2 | None | 1 | 1 | None | None |
10 rows × 37 columns
Distribution Comparison: Overall vs Quentin Tarintino
# Rating distribution for all directors
= spark.sql("""
all_directors_ratings SELECT
CASE
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 0 AND 2 THEN '0-2'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 2 AND 4 THEN '2-4'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 4 AND 6 THEN '4-6'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 6 AND 8 THEN '6-8'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 8 AND 10 THEN '8-10'
END AS rating_range,
COUNT(*) AS movie_count
FROM movies
WHERE `What did they do ?` = 'director'
GROUP BY rating_range
ORDER BY rating_range
""")
# Rating distribution for Quentin Tarantino's movies
= spark.sql("""
tarantino_ratings SELECT
CASE
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 0 AND 2 THEN '0-2'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 2 AND 4 THEN '2-4'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 4 AND 6 THEN '4-6'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 6 AND 8 THEN '6-8'
WHEN CAST(`IMDB Rating` AS FLOAT) BETWEEN 8 AND 10 THEN '8-10'
END AS rating_range,
COUNT(*) AS movie_count
FROM movies
WHERE `Person Name` = 'Quentin Tarantino' AND `What did they do ?` = 'director'
GROUP BY rating_range
ORDER BY rating_range
""")
# Convert to pandas for plotting
= all_directors_ratings.toPandas()
all_directors_df = tarantino_ratings.toPandas() tarantino_df
The overall IMBD Rating has ratings starting from the 0-2 bin to the 8-10 bin. Compared to Tarantino, his lowest rating bin starts at the 4-6 bin. This is one film. However, he has six films in the 6-8 bin and 5 films in the 8-10 bin.
import matplotlib.pyplot as plt
import seaborn as sns
# Add a 'Director' column to distinguish between all directors and Quentin Tarantino
'Director'] = 'All Directors'
all_directors_df['Director'] = 'Quentin Tarantino'
tarantino_df[
# Reset index to avoid duplicate index issues when concatenating
= all_directors_df.reset_index(drop=True)
all_directors_df = tarantino_df.reset_index(drop=True)
tarantino_df
# Sort the combined dataframe by 'rating_range' to ensure proper ordering in the plot
= ['0-2', '2-4', '4-6', '6-8', '8-10']
rating_order 'rating_range'] = pd.Categorical(all_directors_df['rating_range'], categories=rating_order, ordered=True)
all_directors_df['rating_range'] = pd.Categorical(tarantino_df['rating_range'], categories=rating_order, ordered=True)
tarantino_df[
# Create separate figures for each plot
# Plot for All Directors
=(10, 6))
plt.figure(figsize=all_directors_df, x='rating_range', y='movie_count', color='darkgrey')
sns.barplot(data'IMDb Rating Range')
plt.xlabel('Movie Count')
plt.ylabel('IMDb Rating Distribution: All Directors')
plt.title(
plt.show()
# Plot for Quentin Tarantino
=(10, 6))
plt.figure(figsize=tarantino_df, x='rating_range', y='movie_count', color='k')
sns.barplot(data'IMDb Rating Range')
plt.xlabel('Movie Count')
plt.ylabel('IMDb Rating Distribution: Quentin Tarantino')
plt.title( plt.show()
Runtime Comparison (Overall vs. Quentin Tarantino)
= spark.sql("""
overall_runtime_stats SELECT
AVG(CAST(`Runtime (Minutes)` AS FLOAT)) AS avg_runtime,
MIN(CAST(`Runtime (Minutes)` AS FLOAT)) AS min_runtime,
MAX(CAST(`Runtime (Minutes)` AS FLOAT)) AS max_runtime
FROM movies
WHERE `What did they do ?` = 'director' AND `Runtime (Minutes)` < 300 AND `Runtime (Minutes)` > 60
""")
= spark.sql("""
tarantino_runtime_stats SELECT
AVG(CAST(`Runtime (Minutes)` AS FLOAT)) AS avg_runtime,
MIN(CAST(`Runtime (Minutes)` AS FLOAT)) AS min_runtime,
MAX(CAST(`Runtime (Minutes)` AS FLOAT)) AS max_runtime
FROM movies
WHERE `Person Name` = 'Quentin Tarantino' AND `What did they do ?` = 'director' AND `Runtime (Minutes)` < 300 AND `Runtime (Minutes)` > 60
""")
= overall_runtime_stats.toPandas()
overall_runtime_df = tarantino_runtime_stats.toPandas() tarantino_runtime_df
import pandas as pd
import matplotlib.pyplot as plt
# Sample data assuming the data from `overall_runtime_df` and `tarantino_runtime_df` is available
= {
runtime_data "Director": ["All Directors", "Quentin Tarantino"],
"Avg Runtime": [overall_runtime_df['avg_runtime'][0], tarantino_runtime_df['avg_runtime'][0]],
"Min Runtime": [overall_runtime_df['min_runtime'][0], tarantino_runtime_df['min_runtime'][0]],
"Max Runtime": [overall_runtime_df['max_runtime'][0], tarantino_runtime_df['max_runtime'][0]]
}
= pd.DataFrame(runtime_data)
runtime_df
# Separate plots for each metric
= ["Avg Runtime", "Min Runtime", "Max Runtime"]
metrics = ["Average Runtime Comparison", "Minimum Runtime Comparison", "Maximum Runtime Comparison"]
titles
for metric, title in zip(metrics, titles):
='Director', y=metric, kind="bar", legend=False, color=['darkgray', 'k'], figsize=(8, 6))
runtime_df.plot(x
plt.title(title)"Runtime (Minutes)")
plt.ylabel("Director Category")
plt.xlabel( plt.show()
Genre Comparison: (Overall vs. Quentin Tarantino)
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
# Retrieve genre distributions from Spark SQL
= spark.sql("""
tarantino_genres SELECT genre, COUNT(*) AS genre_count
FROM (
SELECT EXPLODE(SPLIT(`Genres (full list)`, ',')) AS genre
FROM movies
WHERE `Person Name` = 'Quentin Tarantino'
) AS tarantino_genres
GROUP BY genre
ORDER BY genre_count DESC
""")
= spark.sql("""
all_genres SELECT genre, COUNT(*) AS genre_count
FROM (
SELECT EXPLODE(SPLIT(`Genres (full list)`, ',')) AS genre
FROM movies
) AS all_genres
GROUP BY genre
ORDER BY genre_count DESC
""")
# Convert the results to pandas DataFrames for plotting
= tarantino_genres.toPandas()
tarantino_genres_df = all_genres.toPandas()
all_genres_df
# Add a 'Director' column to differentiate the datasets
'Director'] = 'Quentin Tarantino'
tarantino_genres_df['Director'] = 'All Directors'
all_genres_df[
# Combine both DataFrames into one for easier plotting
= pd.concat([tarantino_genres_df, all_genres_df])
combined_df
# Sort the combined DataFrame by genre count in descending order
= combined_df.sort_values(by='genre_count', ascending=False)
combined_df
# Set up the plot
=(12, 8))
plt.figure(figsize
= np.log(combined_df['genre_count'])
log_y # Create the bar plot with proper dodge to prevent overlap
=combined_df, x='genre', y=log_y, hue='Director', palette=['k', 'darkgrey'])
sns.barplot(data
# Rotate the x-axis labels for better readability
=90)
plt.xticks(rotation
# Add labels and title
'Genre')
plt.xlabel('Movie Count')
plt.ylabel('Genre Distribution: Quentin Tarantino vs All Directors (Log Scale)')
plt.title(="Director Category")
plt.legend(title
# Show the plot
plt.tight_layout(); plt.show()
Production Company Comparison: (TOP 10 Overall vs. Quentin Tarantino)
# General production company distribution for all directors
= spark.sql("""
production SELECT
`Production Companies (1st)` AS production_company,
COUNT(*) AS movie_count
FROM movies
WHERE `What did they do ?` = 'director' AND `Production Companies (List)` IS NOT NULL
GROUP BY production_company
ORDER BY movie_count DESC
LIMIT 10
""")
# Production companies for Quentin Tarantino
= spark.sql("""
production_qt SELECT
`Production Companies (1st)` AS production_company,
COUNT(*) AS movie_count
FROM movies
WHERE `Person Name` = 'Quentin Tarantino' AND `What did they do ?` = 'director' AND `Production Companies (List)` IS NOT NULL
GROUP BY production_company
ORDER BY movie_count DESC
""")
# Convert Spark DataFrames to Pandas DataFrames
= production_qt.toPandas()
tarantino_production_df = production.toPandas()
all_production_df
# Create comparison plots
import matplotlib.pyplot as plt
import seaborn as sns
# Add a 'Director' column to each DataFrame to differentiate
'Director'] = 'Quentin Tarantino'
tarantino_production_df['Director'] = 'All Directors'
all_production_df[
# Combine both DataFrames into one for easier plotting
= pd.concat([tarantino_production_df, all_production_df])
combined_production_df
# Set up the plot
=(12, 8))
plt.figure(figsize
# Create a bar plot with production companies on the x-axis and movie count on the y-axis
=combined_production_df, x='production_company', y='movie_count', hue='Director', palette=['k', 'darkgrey'])
sns.barplot(data
# Apply a logarithmic scale to the y-axis
'log')
plt.yscale(
# Rotate x-axis labels for readability
=45, ha='right')
plt.xticks(rotation
# Add labels and title
'Production Company')
plt.xlabel('Log of Movie Count')
plt.ylabel('Top Production Companies: Quentin Tarantino vs All Directors')
plt.title(="Director Category")
plt.legend(title
# Show plot
plt.tight_layout() plt.show()
Time-Series: Average Imbd Rating (Overall vs. Quentin Tarantino)
= spark.sql("""
tarantino_yearly SELECT
CAST(`Year of Release` AS INT) AS year,
COUNT(*) AS movie_count,
AVG(CAST(`IMDB Rating` AS FLOAT)) AS avg_rating
FROM movies
WHERE `Person Name` = 'Quentin Tarantino' AND `What did they do ?` = 'director'
GROUP BY year
ORDER BY year
""")
= spark.sql("""
all_directors_yearly SELECT
CAST(`Year of Release` AS INT) AS year,
COUNT(*) AS movie_count,
AVG(CAST(`IMDB Rating` AS FLOAT)) AS avg_rating
FROM movies
WHERE `What did they do ?` = 'director'
GROUP BY year
ORDER BY year
""")
# Convert Spark DataFrames to pandas
= tarantino_yearly.toPandas()
tarantino_yearly_df = all_directors_yearly.toPandas()
all_directors_yearly_df
# Add 'Director' column to distinguish in plotting
'Director'] = 'Quentin Tarantino'
tarantino_yearly_df['Director'] = 'All Directors'
all_directors_yearly_df[
# Combine DataFrames
= pd.concat([tarantino_yearly_df, all_directors_yearly_df]) combined_yearly_df
# Check for any missing values in the DataFrame
#print(combined_yearly_df.isna().sum())
# Drop rows with missing values if any
=['year', 'avg_rating'], inplace=True)
combined_yearly_df.dropna(subset
# Ensure 'year' is integer type and 'avg_rating' is numeric
'year'] = combined_yearly_df['year'].astype(int)
combined_yearly_df['avg_rating'] = pd.to_numeric(combined_yearly_df['avg_rating'], errors='coerce')
combined_yearly_df[
# Check if any non-numeric data slipped into 'avg_rating' and convert to numeric again if necessary
=['avg_rating'], inplace=True)
combined_yearly_df.dropna(subset
# Try plotting again
import matplotlib.pyplot as plt
import seaborn as sns
=(10, 6))
plt.figure(figsize
# Plot number of movies per year
2, 1, 1)
plt.subplot(=combined_yearly_df, x='year', y='movie_count', hue='Director', marker='o',palette=['k', 'darkgrey'])
sns.lineplot(data'Year')
plt.xlabel('Number of Movies')
plt.ylabel('Number of Movies Released Per Year: Quentin Tarantino vs All Directors')
plt.title(
# Plot average IMDb rating per year
2, 1, 2)
plt.subplot(=combined_yearly_df, x='year', y='avg_rating', hue='Director', marker='o', palette=['k', 'darkgrey'])
sns.lineplot(data'Year')
plt.xlabel('Average IMDb Rating')
plt.ylabel('Average IMDb Rating Per Year: Quentin Tarantino vs All Directors')
plt.title(
plt.tight_layout() plt.show()
LASSO Modeling: Extract Features for Binary Outcome
# Use Spark SQL to filter Quentin Tarantino's movies and add high rating binary label
= """
query SELECT *, CASE WHEN CAST(`IMDB Rating` AS INT) >= 7.0 THEN 1 ELSE 0 END AS label
FROM movies
WHERE `Person Name` = 'Quentin Tarantino' AND `What did they do ?` = 'director'
"""
= spark.sql(query)
filtered_df # filtered_df = filtered_df.dropna()
# filtered_df = filtered_df.distinct()
# # Create a list of string-type columns from the updated filtered_df, excluding 'label'
# string_features = [field.name for field in filtered_df.schema.fields
# if isinstance(field.dataType, StringType) and field.name != "label"]
= filtered_df.drop("IMDB Rating") filtered_df
from pyspark.sql.functions import col, countDistinct
= filtered_df.na.fill("missing")
filtered_df # Identify columns with more than one distinct non-null value
= [
valid_columns for column in filtered_df.columns
column if filtered_df.filter(col(column).isNotNull()).select(column).distinct().count() > 1
]
# Keep only valid columns
= filtered_df.select(*valid_columns)
filtered_df
from pyspark.sql import functions as F
# Replace null values in the entire DataFrame with "missing"
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Extract string and numerical features
= [field.name for field in filtered_df.schema.fields
string_features if isinstance(field.dataType, StringType)]
# Indexing string features
= [
indexers =col, outputCol=f"{col}Index", handleInvalid="skip")
StringIndexer(inputColfor col in string_features
]
# Encoding indexed string features
= [
encoders =f"{col}Index", outputCol=f"{col}Encoded")
OneHotEncoder(inputColfor col in string_features
]
# Extract numerical features
= [field.name for field in filtered_df.schema.fields
numerical_features if isinstance(field.dataType, (IntegerType, DoubleType)) and field.name != "label"]
# Assemble both encoded string features and numerical features
= VectorAssembler(
assembler =[f"{col}Encoded" for col in string_features] + numerical_features,
inputCols="features"
outputCol
)
# Create pipeline with indexing, encoding, and feature assembly
= Pipeline(stages=indexers + encoders + [assembler])
pipeline
# Transform the data
= pipeline.fit(filtered_df).transform(filtered_df)
prepared_data
# Define logistic regression model (Lasso)
= LogisticRegression(featuresCol="features", labelCol="label",
lasso_logistic =1.0, family="binomial",
elasticNetParam="rawPrediction",
rawPredictionCol="probability",
probabilityCol="prediction")
predictionCol
# Set up the evaluator
= BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")
evaluator
# Define parameter grid for cross-validation
= (ParamGridBuilder()
paramGrid 0.1, 0.01]) # Example of Lasso tuning
.addGrid(lasso_logistic.regParam, [0.0, 1.0]) # Lasso vs Ridge
.addGrid(lasso_logistic.elasticNetParam, [
.build())
# Set up cross-validation
= CrossValidator(estimator=lasso_logistic,
crossval =paramGrid,
estimatorParamMaps=evaluator,
evaluator=10) # 10-fold cross-validation
numFolds
# Fit the model with cross-validation
= crossval.fit(prepared_data) cv_model
= cv_model.bestModel
best_model # Access the coefficients from the best model
= best_model.coefficients.toArray()
coefficients
# Retrieve the final feature names after one-hot encoding from the VectorAssembler metadata
= []
expanded_feature_names
# Access the metadata for the assembled features column
= prepared_data.schema["features"].metadata
metadata
# Extract the names of all features used in the VectorAssembler
if "ml_attr" in metadata and "attrs" in metadata["ml_attr"]:
= metadata["ml_attr"]["attrs"]
attrs
# Combine attributes from binary and numeric features
for attr_type in ["numeric", "binary"]:
if attr_type in attrs:
for attr in attrs[attr_type]:
"name"])
expanded_feature_names.append(attr[
# Debug: Print the number of final expanded feature names and coefficients
print(f"Number of expanded feature names: {len(expanded_feature_names)}")
print(f"Number of coefficients: {len(coefficients)}")
# Ensure there is no mismatch between the number of expanded feature names and coefficients
if len(expanded_feature_names) == len(coefficients):
# Create a dictionary of feature names and their corresponding coefficients
= {expanded_feature_names[i]: coefficients[i] for i in range(len(coefficients))}
feature_importance
# Keep only positive coefficients
= {feature: coef for feature, coef in feature_importance.items() if coef > 0}
positive_feature_importance
# Sort the features by absolute value of the coefficients
= sorted(positive_feature_importance.items(), key=lambda x: abs(x[1]), reverse=True)
sorted_positive_feature_importance
# Display the sorted feature importance for positive coefficients only
print("Detailed Feature Importance (positive coefficients only, sorted by absolute value):")
for feature, coef in sorted_positive_feature_importance:
print(f"Feature: {feature}, Coefficient: {coef}")
else:
print("There is still a mismatch between the number of expanded feature names and coefficients. Please check the data pipeline.")
import textwrap
import matplotlib.pyplot as plt
import seaborn as sns
= 50 # Maximum feature name length before truncating
max_len
# Initialize a list to store the wrapped feature names
= []
wrapped_features
# Loop through each feature and coefficient
for feature, coef in filtered_feature_coeff_df:
if len(feature) > max_len: # If feature name is longer than the max length
# Wrap/shorten the feature name
= textwrap.shorten(feature, width=max_len, placeholder="...")
wrapped_feature
wrapped_features.append((wrapped_feature,coef))else:
wrapped_features.append((feature,coef))
# Sort features by coefficient values for better visualization
= sorted(wrapped_features, key=lambda x: x[1], reverse=True)[:50]
sorted_feature_coeff_df = zip(*sorted_feature_coeff_df)
sorted_features, sorted_coefficients
# Set a color palette using seaborn
= ['k' if coeff > 0 else 'darkgrey' for coeff in sorted_coefficients]
colors
# Plot the coefficients with color palette
=(12, 8))
plt.figure(figsize=colors)
plt.barh(sorted_features, sorted_coefficients, color'Coefficient Value')
plt.xlabel('Features')
plt.ylabel('Filtered Feature Coefficients from Logistic Regression')
plt.title(
# Invert the y-axis to make the biggest bar appear at the top
plt.gca().invert_yaxis()
plt.tight_layout(); plt.show()