Click Through Rate Prediction

by Aaron Olson, Daniel Elkin & Roger Leung - Sat 08 June 2019
Tags: #python #gradient boost #decision tree #spark #pyspark

Table of Contents Section 1 - Question Formulation Section 3 - EDA & Challenges Section 1 - Question Formulation In this notebook, we build a model to perform clickthrough rate prediction using a public dataset from CriteoLabs, which is available here: https://www.kaggle.com/c/criteo-display-ad-challenge/data.

Clickthrough rate, or CTR, is a metric used by advertisers to determine how effectively their ads are attracting visitors to their websites or products and is the ratio of the number of times an ad has been clicked to the number of times an ad has been shown (https://support.google.com/google-ads/answer/2615875?hl=en). A relatively high CTR means that users are being show relevant ads, and advertisers and ad publishers thus want to maximize CTR in order to attract the most possible users and maximize profits.

The dataset used here contains seven days of data from Criteo, an ad targeting company. There is one target column for each row that indicates whether or not an ad was clicked and 39 feature columns. While we are not told what each feature represents, we know that they are data related to the publisher, the advertiser, the user and interactions between the user and the advertiser (https://www.kaggle.com/c/criteo-display-ad-challenge/discussion/9568#49738). The task is then to use these features to predict whether or not an ad was clicked.

In general, the average CTR is less than two percent across industries, so a model that achieves better performance would be practically useful (https://blog.hubspot.com/agency/google-adwords-benchmark-data). As we'll see below, this particular dataset has a mean CTR of about 25%. The baseline accuracy we'll seek to exceed is thus 75%, as this would be the accuracy a model would achieve by simply predicting the negative class in all cases. The model we'll use here is a decision tree.

Decision Tree and Gradient Boosting in Python

In our analysis for click through rate we will implement the gradient boosting algorithm in order to fit a number of decision trees to the feature space. With the goal of predicting the proportion of clicks in order to determine the most beneficial add space, we will need to properly assess whether or no a user is likely to click on an add given the 38 features we have in our dataset.

First we will discuss the method to fit a single decision tree in python, we will then expand this to the basis of gradient boost which will be the algorithm we will utilize on our full CTR dataset.

Single Decision Tree

The goal of a decision tree is to split up the rows into pure samples by splitting up based on a feature value. We take a number of splits in order to do so. For example, if our decision tree were attempting to determine whether a sign is a stop sign (amongst examples of signs), one decision could be whether the sign was red or not. Presumably this would make the sample space more pure as all stop signs are red, whereas if a sign isn't red it won't be in the true split (our negative split is 100% pure for non-stop sign). We would then ask further questions of our true split to continue to filter down. Because we can presumably develop trees until there is a single example in each leaf (terminating node) decision trees are prone to overfitting. To prevent a tree from overfitting, we can set a minimum number of examples in a leaf node, if a returned branch is smaller than this value we cannot split make the split, this is the same as early termination in other machine learning routines.

To make the most judicious split we have to sample at every potential split (greedy algorithm). In the case of categorical features we can make use of the Brieman technique where we divide the feature by value and sort by mean target variable. For example, if our feature has values a, b and c and the means of the target variable (binary coded as 0 / 1) is 0.1, 0.8, 0.3 respectively, we would order our values a, c and b. We can then walk through the splits sequentially to reduce the number of total splits required.

By iterating over each feature / column and each value in each feature we compute the best score metric and keep track of which feature and value produces the best score. There are a number of score metrics that can be used, in the example below we use the gini score.

The gini score is defined as: $\sum_{k}^{K} = p_{mk} * (1-p_{mk})$ where $p_{mk} =$ proportion of class k in partition m $ = \frac{1}{N_m} \sum I(y_i = k)$

The Gini score takes the proportion that are correctly classified and multiply by the proportion that isn't correctly classified, summing over all classes of the target variable. A Gini index score of 0 means perfect segregation given the data. Gini allows us a more complex method to compute error for a classification target variable.

We compute the gini index for a potential split point for a specific variable and then compute the information gained. Our best split point is defined as the split that gives us the most information gained. Information gain is defined as: $Gain = Current Uncertainty - (\frac{n_{LHS}}{N} Gini_{LHS} + (1-\frac{n_{LHS}}{N}) Gini_{RHS})$. After iterating across all potential variables and split points we choose the split that generates the highest information gain. We then recursively call the algorithm until the entire tree structure is defined.

The goal of a single decision tree is to create branch points that have the highest purity of outcome variable possible as mentioned earlier. Below we represent the python code and build a tree for a toy dataset. There are limitations with a single decision tree which will be discussed down below when we address gradient boost techniques.

# imports
import re
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import types
import pyspark.sql
import pyspark.sql.functions

# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw3_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

# create RDDs
trainRDD = sc.textFile("./data/train.txt")
testRDD = sc.textFile("./data/test.txt")
toytrainRDD = sc.textFile('./data/toy_train.txt')
TOY_FILE = "./data/train.tiny.csv"
df = pd.read_csv(TOY_FILE)
df = df.drop(['Id'], axis = 1)
df.head()
Label I1 I2 I3 I4 I5 I6 I7 I8 I9 ... C17 C18 C19 C20 C21 C22 C23 C24 C25 C26
0 0 1.0 1 5.0 0.0 1382.0 4.0 15.0 2.0 181.0 ... e5ba7672 f54016b9 21ddcdc9 b1252a9d 07b5194c NaN 3a171ecb c5c50484 e8b83407 9727dd16
1 0 2.0 0 44.0 1.0 102.0 8.0 2.0 2.0 4.0 ... 07c540c4 b04e4670 21ddcdc9 5840adea 60f6221e NaN 3a171ecb 43f13e8b e8b83407 731c3655
2 0 2.0 0 1.0 14.0 767.0 89.0 4.0 2.0 245.0 ... 8efede7f 3412118d NaN NaN e587c466 ad3062eb 3a171ecb 3b183c5c NaN NaN
3 0 NaN 893 NaN NaN 4392.0 NaN 0.0 0.0 0.0 ... 1e88c74f 74ef3502 NaN NaN 6b3a5ca6 NaN 3a171ecb 9117a34a NaN NaN
4 0 3.0 -1 NaN 0.0 2.0 0.0 3.0 0.0 0.0 ... 1e88c74f 26b3c7a7 NaN NaN 21c9516a NaN 32c7478e b34f3128 NaN NaN

5 rows × 40 columns

def gini_index(gini_df):
    '''
    Compute the gini index for categorical features/variables
    Input: 
        gini_df: pandas dataframe used to compute gini index
    Returns: 
        impurity: gini index score of the dataframe
    '''
    total_size = df.size
    gini = 0.0
    size = float(len(gini_df))
    if size == 0:
        return 1
    counts = gini_df['Label'].unique()
    # score the group based on the score for each class
    impurity = 1
    for count in counts:
        prob_of_lbl = len(gini_df[gini_df['Label'] == count]) / float(len(gini_df))
        impurity -= prob_of_lbl**2
    return impurity

def test_split(index, value, dataset):
    '''
    Split a dataframe on a condition in order to  represent a node on a tree
    Input: 
        index: column name of feature/variable to perform split
        value: value of feature value to split at
        dataset: dataset to be split
    Returns:
        left: dataset that is less than or equal to value if numeri or equal to value if categorical
        right: dataset that is greater than or not equal to the value
    '''
    left, right = pd.DataFrame(), pd.DataFrame()
    # test if variable is numeric or categorical
    if isinstance(dataset[index][0], int) or isinstance(dataset[index][0], float):
            left = dataset[dataset[index] <= value]
            right = dataset[dataset[index] > value]
    else:
        left = dataset[dataset[index] == value]
        right = dataset[dataset[index] != value]
    return left, right

def info_gain(left, right, current_uncertainty):
    """
    Using the Gini index, compute the information gain to quantify the best split point
    Inputs: 
        left: pandas dataframe of true branch from previous split
        right: pandas dataframe of right branch from previous split
        current_uncertainty: the gini-index of the parent node

    Returns: 
        In information gained based on the tree splits and previous gini-index
    """
    p = float(len(left)) / (len(left) + len(right))
    return current_uncertainty - p * gini_index(left) - (1 - p) * gini_index(right)


def find_best_split(dataset):
    '''
    Uses the Brieman method for categorical variables and each split point for numeric variables
    to test at every potential split point across the feature space to compute the gini index and information 
    gain in order to determine the optimal split point and splitting value given the present state

    Inputs: 
        dataset: pandas dataframe consisting of the variables and data
    Returns: 
        best_gain: the best gain score given the optimal split point
        best_var_split: the variable and value that produces the best split
    '''
    best_gain = 0  
    best_var_split = None  
    current_uncertainty = gini_index(dataset)

    for col in dataset.drop(['Label'], axis = 1):
        if type(dataset[col][0]) == numpy.float64:
            values = sorted(dataset[col].unique()[~numpy.isnan(dataset[col].unique())].tolist())
        else:
            values = dataset.groupby(col).mean()['Label'].sort_values().index.tolist()

        for val in values:  # for each value
            # try splitting the dataset
            true_rows, false_rows = test_split(col, val, dataset)

            # Skip this split if it doesn't divide the
            # dataset.
            if len(true_rows) == 0 or len(false_rows) == 0:
                continue

            # Calculate the information gain from this split
            gain = info_gain(true_rows, false_rows, current_uncertainty)

            if gain >= best_gain:
                best_gain, best_var_split = gain, (col, val)

    return best_gain, best_var_split

def class_counts(dataset):
    '''
    function to return the average proportion of results belonging to a particular class
    '''
    return (len(dataset[dataset['Label'] == 0]) / len(dataset) + len(dataset[dataset['Label'] == 1]) / len(dataset)) / 2

def build_tree(dataset):
    '''
    Fucntion that is called recursively to build a single tree
    Inputs: 
        dataset: dataset to perform tree building on
    Retruns: 
        best_var_split: tuple containing feature adn value to perform split
        left: dataset for left branch
        right: dataset for right branch
    '''
    if len(dataset) < 5:
        return class_counts(dataset)

    gain, best_var_split = find_best_split(dataset)
    index, value = best_var_split
    print(index, value, gain)
    # Base case: no further info gain
    # Since we can ask no further questions,
    # we'll return a leaf.
    if gain == 0:
        return class_counts(dataset)

    # If we reach here, we have found a useful feature / value
    # to partition on.
    true_rows, false_rows = test_split(index, value, dataset)
    true_rows = true_rows.reset_index(drop=True)
    false_rows = false_rows.reset_index(drop=True)

    # Recursively build the true branch.
    true_branch = build_tree(true_rows)

    # Recursively build the false branch.
    false_branch = build_tree(false_rows)

    # Return a Question node.
    # This records the best feature / value to ask at this point,
    # as well as the branches to follow
    # dependingo on the answer.
    return best_var_split, true_branch, false_branch
build_tree(df)
I13 0.0 0.014507766889948148
C20 5840adea 0.1007317969990627
C22 8ec974f4 0.35123966942148765
I12 2.0 0.42603550295857984
C26 d14e41ff 0.0
I5 1782.0 0.008934190851493162
I7 19.0 0.013055620095586487
C13 605bbc24 0.010911706067455007
C26 cc7a24ff 0.24489795918367355
I12 0.0 0.018828690566566025
C22 ad3062eb 0.12180573839966816
C23 55dd3565 0.21875
C26 f6f86eb4 0.0
C13 e40e52ae 0.06946786752305406
C2 207b2d81 0.085260051966354
C23 32c7478e 0.0738630368259999
I8 5.0 0.22721893491124273
I6 15.0 0.15333333333333318
C26 b4aa4b3d 0.0
C26 c84c4aec 0.0
C22 8ec974f4 0.05001017772076438
C1 8cf07265 0.1115255981476716
I11 8.0 0.11165121373060899
C26 bb4e2505 0.13265306122448978
C26 f159b6cb 0.0
I6 19.0 0.31604938271604943
C24 b34f3128 0.31999999999999984
I12 5.0 0.01674185695113256
C19 0ec8d23c 0.037680719462679674
C22 ad3062eb 0.15277777777777787
C26 eb9a9610 0.0
C25 001f3601 0.48





(('I13', 0.0),
 (('C20', '5840adea'),
  (('C22', '8ec974f4'), 0.5, 0.5),
  (('I12', 2.0), 0.5, 0.5)),
 (('I5', 1782.0),
  (('I7', 19.0),
   (('C13', '605bbc24'),
    (('C26', 'cc7a24ff'), 0.5, 0.5),
    (('I12', 0.0),
     (('C22', 'ad3062eb'), (('C23', '55dd3565'), 0.5, 0.5), 0.5),
     (('C13', 'e40e52ae'),
      0.5,
      (('C2', '207b2d81'),
       0.5,
       (('C23', '32c7478e'),
        (('I8', 5.0), 0.5, (('I6', 15.0), 0.5, 0.5)),
        0.5))))),
   (('C22', '8ec974f4'),
    0.5,
    (('C1', '8cf07265'),
     0.5,
     (('I11', 8.0),
      (('C26', 'bb4e2505'), 0.5, 0.5),
      (('I6', 19.0), 0.5, (('C24', 'b34f3128'), 0.5, 0.5)))))),
  (('I12', 5.0),
   (('C19', '0ec8d23c'), 0.5, (('C22', 'ad3062eb'), 0.5, 0.5)),
   (('C25', '001f3601'), 0.5, 0.5))))

Gradient Boost

We have shown above how to fit a single decision tree. As mentioned, single trees can frequently overfit the data - and generally don't have the best performance. For this reason a common method to use is gradient boost, where we iteratively fit additional trees based on the residual error of the last tree - in this way it is a sequential model. The theory behind this approach is to focus and improve on the 'difficult' datapoints to learn by our current model. We then proceed to iteratively develop trees using the residual error to improve the performance compared to a combination of all previous trees.

Utlimately, by sequentially building trees we have a well fit model. There are a couple of caveats to a gradient boosting approach that should be known prior to implementation and methods to reduce potential error: - Can be prone to over-fitting: As we are sequentially building a model, the model will fine tune towards more niche datapoints and can become over-fit when running the created series of trees on the test set. - Can take a long time to generate given the sequential nature of building trees and depth of each tree - In general, have higher performance as compared to a random forest approach with the same number of trees and tree dpeth

The gradient boost algorithm follows the routine listed below:

  1. Initialize the model with a constant value: $$F_0(x) = argmin \sum{i = 1}^{n} L(y_i, \gamma)$$ For binary classification we use the log loss probability function as our loss function $$L = y_i * log(p_i) + (1-y_i) * log(1-p_i)$$ For the first initialization we can compute the log-odds as $$log odds = \frac{# of true observations}{# of false observations}$$ in order to map this value into a probability we apply the logistic function $$probability = \frac{e^{-log-odds}}{1+e^{-log-odds}}$$ to get probability values. The model will then be initialized with the same value for all n data-points
  2. Start a loop iterating the number of desired trees desired for gradient boost
    • Compute the pseudo-residuals using the following equation: $$\frac{dL}{dF} = \frac{y_i - (1-y_i)e^{y_i}}{1+e^{y_i}}$$ Using the log-loss function this equation equals the observed - predicted values. * We use observed - predicted as it is the first derivative of our loss function defined above: $$\frac{dL}{dF} = \frac{y_i - (1-y_i)e^{y_i}}{1+e^{y_i}} = \frac{y_i * (1+e^{-y_i})}{1+e^{-y_i}} - \frac{1}{1+e^{y_i}}= y_i - p$$
    • Fit a tree using the pseudo-residuals as the target variable.
    • In our log-loss scenario, the output values at the leaf node for this second tree are defined using the following equation: $$\gamma_m = argmin \sum{i = 1}^{n} L(y, F_{m-1}(x_i) + \gamma h_m(x_i))$$ Freidman suggested that rather than computing a gamma value for the entire tree, it is more optimal to compute a gamma for each leaf node - this method is more frequently called TreeBoost. Implementing TreeBoost we get the following value at each leaf: $$\gamma_{jm} = Leaf-Node Value = \frac{\sum Residual_i}{\sum {Previous Probability_i * (1-Previous Probability_i)}$$ We cannot add the previous and the residual because the first is in terms of probabilty and the second is in terms of log odds
  3. Update the model using the leaf node value and a learning rate $\alpha$: $log-odds prediction = previous prediction + \alpha * leaf node value$ Above the probability is defined using the logistic regression equation. Implementation of the algorithm can be done using logistic regression or exponential
  4. Iterate the loop building additional trees based on the new pseudo-residual values

This implimentation follows Friedman's 'Algorithm 1' routine as noted in the article 'Greedy Function Approximation: A Gradient Boosting Machine'. [Friedman, J. (1999). Greedy Function Approximation: A Gradient Boosting Machine. Stanford: Stanford University.] Note in the implementation below the workflow is simplified by using the decision tree regressor and working in probabilities rather than log-odds and probabilities.

from sklearn.tree import DecisionTreeRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn import preprocessing
from sklearn.metrics import log_loss
from sklearn.model_selection import train_test_split

TOY_FILE = "./data/train.tiny.csv"
df = pd.read_csv(TOY_FILE)
df = df.drop(['Id'], axis = 1)

train_error = []
test_error = []

# Assemble features and labels
x = df.loc[:, df.columns != 'Label']
x = x.iloc[:, :12].fillna(x.mean())
x = x.join(x_1, how = 'inner')
x = pd.get_dummies(x).fillna(0)
y = df['Label']
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, random_state = 0)

# Learning Rate
alpha = 0.05

# Copy Labels over to dataframe
residual_df = pd.DataFrame(y_train, columns = ['Label'])
# Compute log-odds based on proportion of true/false
F = np.log(len(y_train.values == 1) / len(y_train.values == 0))
# Convert log-odds to probability using logistic function
prob =  np.exp(F) / (1 + np.exp(F))
prob_test = prob#np.exp(np.log(len(y_test.values == 1) / len(y_test.values == 0))) / (1 + np.exp(np.log(len(y_test.values == 1) / len(y_test.values == 0))))
# Copy over prediction and residuals
residual_df['residual'] = y_train - prob
residual_df['pred'] = prob

for i in range(100):
    dtr = DecisionTreeRegressor(max_depth=5, min_samples_leaf=5)
    # set desired value to residual
    y_train = residual_df['residual']
    # fit the decision tree
    dtr.fit(x_train, y_train)
    # copmute probability as the prior probability + learning-rate * prediction
    prob = prob + alpha * dtr.predict(x_train)
    prob_test = prob_test + alpha * dtr.predict(x_test)
    # update residual_df dataframe
    residual_df['residual'] = residual_df['Label'] - prob
    residual_df['prob'] = prob
    # print score based on log-loss
    train_error.append(log_loss(residual_df['Label'], prob))
    test_error.append(log_loss(y_test, prob_test))
#     print("Train Error: " + str(log_loss(residual_df['Label'], prob)))
#     print("Test Error: " + str(log_loss(y_test, prob_test)))
error = pd.DataFrame({'Train': train_error,
                      'Test': test_error})
plt.plot(error.index.values, error['Train'], label = 'Train Error')
plt.plot(error.index.values, error['Test'], label = 'Test Error')
plt.legend()
plt.ylabel('Log Loss')
plt.title('Loss for Gradient Boost Algorithm')
Text(0.5,1,'Loss for Gradient Boost Algorithm')
Drawing

Data Gathering and EDA

Download and unpack the dataset

# !curl -L -o criteo.tar.gz https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  4 4364M    4  217M    0     0   349k      0  3:33:15  0:10:38  3:22:37  327k^C
# !mkdir -p data
# !tar -xvzf criteo.tar.gz -C ./data
tar: Ignoring unknown extended header keyword `SCHILY.dev'
tar: Ignoring unknown extended header keyword `SCHILY.ino'
tar: Ignoring unknown extended header keyword `SCHILY.nlink'
readme.txt
tar: Ignoring unknown extended header keyword `LIBARCHIVE.creationtime'
tar: Ignoring unknown extended header keyword `SCHILY.dev'
tar: Ignoring unknown extended header keyword `SCHILY.ino'
tar: Ignoring unknown extended header keyword `SCHILY.nlink'
test.txt

gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now

Inspect the file that contains the training data

TRAIN_FILE="./data/train.txt"

print("First line: \n")
!head -n 5 $TRAIN_FILE

print("\nNumber of columns: ")
!head -n 1 $TRAIN_FILE | wc -w

# print("\nNumber of rows: ")
# !wc -l < $TRAIN_FILE
First line:

0   1   1   5   0   1382    4   15  2   181 1   2       2   68fd1e64    80e26c9b    fb936136    7b4723c4    25c83c98    7e0ccccf    de7995b8    1f89b562    a73ee510    a8cd5504    b2cb9c98    37c9c164    2824a5f6    1adce6ef    8ba8b39a    891b62e7    e5ba7672    f54016b9    21ddcdc9    b1252a9d    07b5194c        3a171ecb    c5c50484    e8b83407    9727dd16
0   2   0   44  1   102 8   2   2   4   1   1       4   68fd1e64    f0cf0024    6f67f7e5    41274cd7    25c83c98    fe6b92e5    922afcc0    0b153874    a73ee510    2b53e5fb    4f1b46f3    623049e6    d7020589    b28479f6    e6c5b5cd    c92f3b61    07c540c4    b04e4670    21ddcdc9    5840adea    60f6221e        3a171ecb    43f13e8b    e8b83407    731c3655
0   2   0   1   14  767 89  4   2   245 1   3   3   45  287e684f    0a519c5c    02cf9876    c18be181    25c83c98    7e0ccccf    c78204a1    0b153874    a73ee510    3b08e48b    5f5e6091    8fe001f4    aa655a2f    07d13a8f    6dc710ed    36103458    8efede7f    3412118d            e587c466    ad3062eb    3a171ecb    3b183c5c        
0       893         4392        0   0   0       0           68fd1e64    2c16a946    a9a87e68    2e17d6f6    25c83c98    fe6b92e5    2e8a689b    0b153874    a73ee510    efea433b    e51ddf94    a30567ca    3516f6e6    07d13a8f    18231224    52b8680f    1e88c74f    74ef3502            6b3a5ca6        3a171ecb    9117a34a        
0   3   -1      0   2   0   3   0   0   1   1       0   8cf07265    ae46a29d    c81688bb    f922efad    25c83c98    13718bbd    ad9fa255    0b153874    a73ee510    5282c137    e5d8af57    66a76a26    f06c53ac    1adce6ef    8ff4b403    01adbab4    1e88c74f    26b3c7a7            21c9516a        32c7478e    b34f3128

Number of columns: 
38

Create RDDs from the parsed input files and cache them so that the parsing has to occur only once.

# imports
import re
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import types
import pyspark.sql
import pyspark.sql.functions

# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw3_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

# create RDDs
trainRDD = sc.textFile("./data/train.txt")
testRDD = sc.textFile("./data/test.txt")
toytrainRDD = sc.textFile('./data/toy_train.txt')
# store path to notebook
PWD = !pwd
PWD = PWD[0]

FIELDS = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
          'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
          'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26','Label']

# Generate 80/20 (pseudo)random train/test split 
trainRDD, heldOutRDD = fullRDD.randomSplit([0.8,0.2], seed = 1)
print(f"... leaving out {heldOutRDD.count()} records for evaluation and assigning {trainRDD.count()} records for training.")
toyRDD = sc.textFile('data/toy1000.txt')
print(f"Number of records in toy data: {toyRDD.count()} ...")

toytrainRDD.take(1)
['0\t1.0\t1\t5.0\t0.0\t1382.0\t4.0\t15.0\t2.0\t181.0\t1.0\t2.0\t\t2.0\t68fd1e64\t80e26c9b\tfb936136\t7b4723c4\t25c83c98\t7e0ccccf\tde7995b8\t1f89b562\ta73ee510\ta8cd5504\tb2cb9c98\t37c9c164\t2824a5f6\t1adce6ef\t8ba8b39a\t891b62e7\te5ba7672\tf54016b9\t21ddcdc9\tb1252a9d\t07b5194c\t\t3a171ecb\tc5c50484\te8b83407\t9727dd16']
def parse_feature(line):
    fields = line.split("\t")
    label, features = int(fields[0]), fields[1:]
        # remove empty features introduced by extra tabs
    return (features[i], 1)

def parse_empty(line):
    fields = line.split("\t")
    label, features = int(fields[0]), fields[1:]
        # remove empty features introduced by extra tabs
    if features[i] == '':
        yield (1, 1)

# helper functions
def parse(line):
    """
    Map line --> tuple of (features, label)
    """
    fields = line.split('\t')
    features,label = fields[1:], fields[0]
    return(features, label)

def edit_data_types(line):
    """
    Map tuple of (features, label) --> tuple of (formated features, label)

    * '' is replaced with 'null'
    * numerical fields are converted to integers
    * make label column numeric
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if value == '':
            formated_features.append('null')
        else:
            if i < 13:
                formated_features.append(float(value)) 
            else:
                formated_features.append(value)
    return (formated_features, int(label))
trainRDDCached = trainRDD.map(parse).cache()
testRDDCached = testRDD.map(parse).cache()
# Parsing, making '' as np.nan and converting numerical features and output label to int
# trainRDDCached = trainRDD.map(parse).map(edit_data_types).cache()
toyRDDCached = toyRDD.map(parse).map(edit_data_types).cache()

We would expect there to be a class imbalance with many more unclicked ads than clicked ads, but let's see how extreme it is. We can perform this counting with a map operation that emits the value of the label and a count of 1 and a reduce operation that sums the count.

trainRDDCached.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y).collect()
[(0, 34095179), (1, 11745438)]

So approximately 74.4% of the data is the negative class.

col = !head -n 1 $TRAIN_FILE | wc -w
total = !wc -l < $TRAIN_FILE
total = int(total[0])
num_unique = []
num_nan = []
for i in range(39):
    if i == 0:
        continue
    RDD = toytrainRDD.map(parse_feature).reduceByKey(lambda a,b: a + b).cache()
    unique = RDD.count()
    num_unique.append(unique)
    blank = toytrainRDD.flatMap(parse_empty).count()
    num_nan.append(blank)
    print('Unique values in column ' + str(i) + ': ' + str(unique) + ' Number of empty values in column: ' +  str(blank))
Unique values in column 1: 344 Number of empty values in column: 0
Unique values in column 2: 148 Number of empty values in column: 449
Unique values in column 3: 56 Number of empty values in column: 420
Unique values in column 4: 1431 Number of empty values in column: 93
Unique values in column 5: 408 Number of empty values in column: 493
Unique values in column 6: 119 Number of empty values in column: 93
Unique values in column 7: 56 Number of empty values in column: 2
Unique values in column 8: 422 Number of empty values in column: 93
Unique values in column 9: 6 Number of empty values in column: 889
Unique values in column 10: 31 Number of empty values in column: 93
Unique values in column 11: 16 Number of empty values in column: 1554
Unique values in column 12: 84 Number of empty values in column: 420
Unique values in column 13: 79 Number of empty values in column: 0
Unique values in column 14: 252 Number of empty values in column: 0
Unique values in column 15: 1293 Number of empty values in column: 66
Unique values in column 16: 1043 Number of empty values in column: 66
Unique values in column 17: 30 Number of empty values in column: 0
Unique values in column 18: 7 Number of empty values in column: 251
Unique values in column 19: 1164 Number of empty values in column: 0
Unique values in column 20: 39 Number of empty values in column: 0
Unique values in column 21: 2 Number of empty values in column: 0
Unique values in column 22: 908 Number of empty values in column: 0
Unique values in column 23: 926 Number of empty values in column: 0
Unique values in column 24: 1239 Number of empty values in column: 66
Unique values in column 25: 824 Number of empty values in column: 0
Unique values in column 26: 20 Number of empty values in column: 0
Unique values in column 27: 819 Number of empty values in column: 0
Unique values in column 28: 1159 Number of empty values in column: 66
Unique values in column 29: 9 Number of empty values in column: 0
Unique values in column 30: 534 Number of empty values in column: 0
Unique values in column 31: 201 Number of empty values in column: 965
Unique values in column 32: 4 Number of empty values in column: 965
Unique values in column 33: 1204 Number of empty values in column: 66
Unique values in column 34: 7 Number of empty values in column: 1631
Unique values in column 35: 12 Number of empty values in column: 0
Unique values in column 36: 729 Number of empty values in column: 66
Unique values in column 37: 33 Number of empty values in column: 965
Unique values in column 38: 554 Number of empty values in column: 965
# helper function 'null' to np.nan for pandas df 
def null_to_nan(line):
    """
    converts "null" to np.nan in RDD
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if value == 'null':
            formated_features.append(np.nan)
        else:
            formated_features.append(value)
    return (formated_features, label)

# put the toy RDD into a pandas dataframe for EDA charting
trainRDDtoPandas = toyRDDCached.map(null_to_nan) \
                                .map(lambda x: np.append(x[0], [x[1]])) \
                                .collect()

FIELDS = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
          'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
          'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26','Label']

toy_df = pd.DataFrame(trainRDDtoPandas, columns=FIELDS)
toy_df.head()
I1 I2 I3 I4 I5 I6 I7 I8 I9 I10 ... C18 C19 C20 C21 C22 C23 C24 C25 C26 Label
0 0.0 478.0 13.0 nan 3396.0 194.0 11.0 13.0 312.0 0.0 ... 395856b0 21ddcdc9 b1252a9d 8e4884c0 nan 423fab69 b936bfbe 001f3601 f2fc1d6e 1
1 2.0 0.0 nan nan 1209.0 0.0 2.0 1.0 0.0 1.0 ... 526e8765 nan nan 8b7fb864 nan 32c7478e 45b2acf4 nan nan 0
2 nan 1.0 1.0 8.0 489.0 66.0 0.0 11.0 8.0 nan ... 8f9b4e88 nan nan 050a23dc nan 32c7478e 8a3cfad4 nan nan 0
3 0.0 179.0 7.0 8.0 996.0 67.0 6.0 44.0 344.0 0.0 ... 2804effd nan nan 723b4dfd nan 32c7478e b34f3128 nan nan 0
4 2.0 2.0 10.0 27.0 1101.0 29.0 11.0 42.0 241.0 1.0 ... e88ffc9d 21ddcdc9 b1252a9d dca9a28d ad3062eb bcdee96c 3fdb382b cb079c2d 1c2df582 1

5 rows × 40 columns

# TOY DATA
# counting records for each class 
count_label_0 = toyRDDCached.filter(lambda x: x[1] == 0).count()
count_label_1 = toyRDDCached.filter(lambda x: x[1] == 1).count()
total = count_label_0 + count_label_1

print(f"{np.round(count_label_0/total*100, 2)} % of the records have label=0 and {np.round(count_label_1/total*100, 2)} % have label=1...")
72.3 % of the records have label=0 and 27.7 % have label=1...
def get_pct_nulls_in_column(dataRDD, var_position):
    """
    Counts the % nulls in a column 
    """

    null_count = dataRDD.map(lambda x: x[0][var_position]) \
                             .filter(lambda x: x == 'null').count()
    total_count = dataRDD.map(lambda x: x[0][var_position]).count()

    return null_count/total_count*100
# TOY DATA
for var_position, var in enumerate(FIELDS):

    if var_position < 39:
        null_pct = get_pct_nulls_in_column(toyRDDCached, var_position)
        print("FEATURE {}: {}% is null".format(var, np.round(null_pct,2)))
FEATURE I1: 43.2% is null
FEATURE I2: 0.0% is null
FEATURE I3: 21.3% is null
FEATURE I4: 22.1% is null
FEATURE I5: 2.4% is null
FEATURE I6: 23.0% is null
FEATURE I7: 3.7% is null
FEATURE I8: 0.0% is null
FEATURE I9: 3.7% is null
FEATURE I10: 43.2% is null
FEATURE I11: 3.7% is null
FEATURE I12: 77.4% is null
FEATURE I13: 22.1% is null
FEATURE C1: 0.0% is null
FEATURE C2: 0.0% is null
FEATURE C3: 3.4% is null
FEATURE C4: 3.4% is null
FEATURE C5: 0.0% is null
FEATURE C6: 13.4% is null
FEATURE C7: 0.0% is null
FEATURE C8: 0.0% is null
FEATURE C9: 0.0% is null
FEATURE C10: 0.0% is null
FEATURE C11: 0.0% is null
FEATURE C12: 3.4% is null
FEATURE C13: 0.0% is null
FEATURE C14: 0.0% is null
FEATURE C15: 0.0% is null
FEATURE C16: 3.4% is null
FEATURE C17: 0.0% is null
FEATURE C18: 0.0% is null
FEATURE C19: 42.2% is null
FEATURE C20: 42.2% is null
FEATURE C21: 3.4% is null
FEATURE C22: 74.5% is null
FEATURE C23: 0.0% is null
FEATURE C24: 3.4% is null
FEATURE C25: 42.2% is null
FEATURE C26: 42.2% is null
# RDD version 
def get_stats(dataRDD, var_position):
    """
    Get statistics for numeric variables 
    stats: mean, median, variance, min, max 
    """

    mean = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').mean() 
    variance = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').variance() 
    minimum = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').min() 
    maximum = dataRDD.map(lambda x: x[0][var_position]).filter(lambda x: x != 'null').max() 

    return mean, variance, minimum, maximum
# save the means in a dictionary
mean_dict_toy = {}
st_dev_dict_toy = {}

# get summary stats with RDDs
for var_position, var in enumerate(FIELDS):
    if var_position < 13:
        mean, variance, minimum, maximum = get_stats(toyRDDCached, var_position)
        print("FEATURE {}: \t mean={}, \t variance={}, \t min={}, \t max={}".format(var, np.round(mean, 2), np.round(variance, 2), minimum, maximum))
        mean_dict_toy[var_position] = mean
        st_dev_dict_toy[var_position] = np.sqrt(variance)
FEATURE I1:      mean=3.17,      variance=33.98,     min=0.0,    max=55.0
FEATURE I2:      mean=114.72,    variance=179230.84,     min=-2.0,   max=5123.0
FEATURE I3:      mean=18.78,     variance=2026.69,   min=0.0,    max=648.0
FEATURE I4:      mean=7.43,      variance=86.02,     min=0.0,    max=77.0
FEATURE I5:      mean=18392.77,      variance=4908735552.87,     min=0.0,    max=1002457.0
FEATURE I6:      mean=95.23,     variance=65007.31,      min=0.0,    max=4304.0
FEATURE I7:      mean=17.94,     variance=8794.35,   min=0.0,    max=2614.0
FEATURE I8:      mean=12.96,     variance=183.6,     min=0.0,    max=49.0
FEATURE I9:      mean=102.42,    variance=38152.34,      min=0.0,    max=2711.0
FEATURE I10:     mean=0.64,      variance=0.5,   min=0.0,    max=4.0
FEATURE I11:     mean=2.78,      variance=23.83,     min=0.0,    max=60.0
FEATURE I12:     mean=1.19,      variance=40.19,     min=0.0,    max=84.0
FEATURE I13:     mean=7.99,      variance=116.35,    min=0.0,    max=97.0
# Pandas version 
num_columns = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13']
toy_df_num = toy_df[num_columns].astype(np.float)
toy_df_num.describe()
I1 I2 I3 I4 I5 I6 I7 I8 I9 I10 I11 I12 I13
count 568.000000 1000.000000 787.000000 779.000000 9.760000e+02 770.000000 963.000000 1000.000000 963.000000 568.000000 963.000000 226.000000 779.000000
mean 3.167254 114.719000 18.780178 7.427471 1.839277e+04 95.231169 17.942887 12.961000 102.419522 0.642606 2.778816 1.194690 7.993582
std 5.834144 423.568469 45.047402 9.280591 7.009829e+04 255.131025 93.826907 13.556669 195.427732 0.705795 4.884305 6.353803 10.793634
min 0.000000 -2.000000 0.000000 0.000000 0.000000e+00 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
25% 0.000000 0.000000 2.000000 2.000000 3.032500e+02 6.000000 1.000000 2.000000 10.500000 0.000000 1.000000 0.000000 2.000000
50% 1.000000 2.000000 6.000000 4.000000 2.696000e+03 28.000000 3.000000 8.000000 39.000000 1.000000 1.000000 0.000000 4.000000
75% 4.000000 32.000000 18.000000 10.000000 9.703500e+03 92.750000 12.000000 20.000000 101.000000 1.000000 3.000000 0.000000 10.000000
max 55.000000 5123.000000 648.000000 77.000000 1.002457e+06 4304.000000 2614.000000 49.000000 2711.000000 4.000000 60.000000 84.000000 97.000000
# Take a look at histograms for each numeric feature 
toy_df_num.hist(figsize=(20,15), bins=15)
plt.show()
Drawing
# plot boxplots of each feature vs. the outcome
fig, ax_grid = plt.subplots(5, 3, figsize=(20,15))
y = toy_df['Label']
for idx, feature in enumerate(num_columns):
    x = toy_df_num[feature]
    sns.boxplot(x, y, ax=ax_grid[idx//3][idx%3], orient='h', linewidth=.5)
    ax_grid[idx//3][idx%3].invert_yaxis()
fig.suptitle("BoxPlots by Label", fontsize=15, y=0.9)
plt.show()
Drawing
# Correlation between numerical features
corr = toy_df_num.corr()
fig, ax = plt.subplots(figsize=(7, 7))
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
cmap = sns.diverging_palette(10, 240, as_cmap=False)
sns.heatmap(corr, mask=mask, cmap=cmap, center=0, linewidths=.2)
plt.title("Correlations between features")
plt.show()
Drawing
def count_categories(dataRDD, var, var_position, top):
    """
    input: RDD, name and position of a categorical variable 

    output: 
    * number of unique categories in the variable
    * counts of each category occurance by label
    """

    # counting category occurance within each categorical feature 
    count_per_category = dataRDD.map(lambda x: ( x[0][var_position], 1)) \
                                           .reduceByKey(lambda x,y: x+y) \
                                           .sortBy(lambda x: -x[1])

    # counting number of unique values within the categorical variable
    num_unique_values = count_per_category.map(lambda x: x[0]).distinct().count()

    print('Unique values within the category:', num_unique_values)
    print(' ')
    top_x = count_per_category.take(top)
    print('Top {} categories by count:'.format(top))
    for i in top_x: 
        print('Category: {}; Count: {}'.format(i[0],i[1]))
    print(' ')
# TOY DATA
for var_position, var in enumerate(FIELDS):

    if var_position > 12 and var_position < 39:
        print(" ")
        print("VARIABLE {}".format(var))
        print(" ")
        count_categories(toyRDDCached, var, var_position=var_position, top=10)
VARIABLE C1

Unique values within the category: 57

Top 10 categories by count:
Category: 05db9164; Count: 485
Category: 68fd1e64; Count: 146
Category: 5a9ed9b0; Count: 103
Category: 8cf07265; Count: 51
Category: be589b51; Count: 41
Category: 5bfa8ab5; Count: 27
Category: f473b8dc; Count: 20
Category: 87552397; Count: 15
Category: 39af2607; Count: 13
Category: 9a89b36c; Count: 9


VARIABLE C2

Unique values within the category: 193

Top 10 categories by count:
Category: 38a947a1; Count: 114
Category: 1cfdf714; Count: 50
Category: 287130e0; Count: 46
Category: 38d50e09; Count: 46
Category: 207b2d81; Count: 37
Category: 09e68b86; Count: 33
Category: 421b43cd; Count: 33
Category: 4f25e98b; Count: 29
Category: 89ddfee8; Count: 27
Category: 58e67aaf; Count: 27


VARIABLE C3

Unique values within the category: 771

Top 10 categories by count:
Category: null; Count: 34
Category: d032c263; Count: 15
Category: 02cf9876; Count: 13
Category: b00d1501; Count: 12
Category: 77f2f2e5; Count: 10
Category: aa8c1539; Count: 10
Category: 2cbec47f; Count: 8
Category: 7da86e4b; Count: 7
Category: 9143c832; Count: 7
Category: b1ecc6c4; Count: 7


VARIABLE C4

Unique values within the category: 660

Top 10 categories by count:
Category: null; Count: 34
Category: 29998ed1; Count: 28
Category: c18be181; Count: 28
Category: d16679b9; Count: 25
Category: 85dd697c; Count: 17
Category: 13508380; Count: 15
Category: f922efad; Count: 14
Category: e3cc371a; Count: 8
Category: 3e2bfbda; Count: 8
Category: b733e495; Count: 7


VARIABLE C5

Unique values within the category: 26

Top 10 categories by count:
Category: 25c83c98; Count: 653
Category: 4cf72387; Count: 170
Category: 43b19349; Count: 56
Category: 384874ce; Count: 41
Category: 0942e0a7; Count: 17
Category: 30903e74; Count: 17
Category: f281d2a7; Count: 13
Category: b0530c50; Count: 7
Category: b2241560; Count: 6
Category: 26eb6185; Count: 2


VARIABLE C6

Unique values within the category: 7

Top 10 categories by count:
Category: 7e0ccccf; Count: 388
Category: fbad5c96; Count: 207
Category: fe6b92e5; Count: 185
Category: null; Count: 134
Category: 6f6d9be8; Count: 37
Category: 13718bbd; Count: 28
Category: 3bf701e7; Count: 21


VARIABLE C7

Unique values within the category: 723

Top 10 categories by count:
Category: 1c86e0eb; Count: 30
Category: 7195046d; Count: 12
Category: 90a2c015; Count: 10
Category: 81bb0302; Count: 10
Category: 468a0854; Count: 8
Category: dc7659bd; Count: 7
Category: 5e64ce5f; Count: 6
Category: d2dbdfe6; Count: 6
Category: f00bddf8; Count: 6
Category: 88002ee1; Count: 6


VARIABLE C8

Unique values within the category: 35

Top 10 categories by count:
Category: 0b153874; Count: 597
Category: 5b392875; Count: 163
Category: 1f89b562; Count: 69
Category: 37e4aa92; Count: 37
Category: 51d76abe; Count: 25
Category: 062b5529; Count: 20
Category: c8ddd494; Count: 12
Category: 6c41e35e; Count: 12
Category: 985e3fcb; Count: 10
Category: 64523cfa; Count: 8


VARIABLE C9

Unique values within the category: 2

Top 10 categories by count:
Category: a73ee510; Count: 900
Category: 7cc72ec2; Count: 100


VARIABLE C10

Unique values within the category: 613

Top 10 categories by count:
Category: 3b08e48b; Count: 216
Category: fbbf2c95; Count: 16
Category: 5ba575e7; Count: 10
Category: efea433b; Count: 10
Category: 67eea4ef; Count: 6
Category: 49d1ad89; Count: 5
Category: 03e48276; Count: 5
Category: 474773a7; Count: 5
Category: f9065d00; Count: 5
Category: 935a36f0; Count: 5


VARIABLE C11

Unique values within the category: 601

Top 10 categories by count:
Category: 755e4a50; Count: 36
Category: 4d8549da; Count: 15
Category: 7f8ffe57; Count: 14
Category: e51ddf94; Count: 14
Category: b7094596; Count: 11
Category: 1054ae5c; Count: 10
Category: a7b606c4; Count: 10
Category: 8b94178b; Count: 9
Category: 0f736a0c; Count: 9
Category: 7e40f08a; Count: 8


VARIABLE C12

Unique values within the category: 736

Top 10 categories by count:
Category: null; Count: 34
Category: 6aaba33c; Count: 28
Category: dfbb09fb; Count: 15
Category: 8fe001f4; Count: 13
Category: e0d76380; Count: 12
Category: 9f32b866; Count: 10
Category: d8c29807; Count: 10
Category: 21a23bfe; Count: 8
Category: ed397d6b; Count: 7
Category: ae1bb660; Count: 7


VARIABLE C13

Unique values within the category: 549

Top 10 categories by count:
Category: 5978055e; Count: 36
Category: 3516f6e6; Count: 16
Category: 51b97b8f; Count: 15
Category: 1aa94af3; Count: 14
Category: 46f42a63; Count: 14
Category: 740c210d; Count: 12
Category: 025225f2; Count: 11
Category: 1f9d2c38; Count: 11
Category: 6e5da64f; Count: 10
Category: eae197fd; Count: 10


VARIABLE C14

Unique values within the category: 18

Top 10 categories by count:
Category: b28479f6; Count: 367
Category: 07d13a8f; Count: 320
Category: 1adce6ef; Count: 156
Category: 64c94865; Count: 47
Category: cfef1c29; Count: 31
Category: 051219e6; Count: 26
Category: d2dfe871; Count: 11
Category: 8ceecbc8; Count: 11
Category: f862f261; Count: 8
Category: ad1cc976; Count: 7


VARIABLE C15

Unique values within the category: 553

Top 10 categories by count:
Category: 2d0bb053; Count: 21
Category: d345b1a0; Count: 14
Category: 310d155b; Count: 14
Category: 42b3012c; Count: 13
Category: 52baadf5; Count: 11
Category: 10040656; Count: 11
Category: f3002fbd; Count: 10
Category: 10935a85; Count: 9
Category: e1ac77f7; Count: 9
Category: 3628a186; Count: 9


VARIABLE C16

Unique values within the category: 707

Top 10 categories by count:
Category: null; Count: 34
Category: b041b04a; Count: 28
Category: 84898b2a; Count: 15
Category: 36103458; Count: 13
Category: 1203a270; Count: 12
Category: 31ca40b6; Count: 10
Category: c64d548f; Count: 10
Category: 587267a3; Count: 8
Category: c4de5bba; Count: 8
Category: 01adbab4; Count: 7


VARIABLE C17

Unique values within the category: 9

Top 10 categories by count:
Category: e5ba7672; Count: 497
Category: d4bb7bd8; Count: 115
Category: 07c540c4; Count: 109
Category: 3486227d; Count: 67
Category: 1e88c74f; Count: 53
Category: 776ce399; Count: 51
Category: 8efede7f; Count: 44
Category: 27c07bd6; Count: 40
Category: 2005abd1; Count: 24


VARIABLE C18

Unique values within the category: 392

Top 10 categories by count:
Category: e88ffc9d; Count: 43
Category: 891589e7; Count: 37
Category: 2804effd; Count: 33
Category: c21c3e4c; Count: 27
Category: 5bb2ec8e; Count: 23
Category: 582152eb; Count: 22
Category: 5aed7436; Count: 22
Category: 7ef5affa; Count: 18
Category: 395856b0; Count: 16
Category: fffe2a63; Count: 12


VARIABLE C19

Unique values within the category: 128

Top 10 categories by count:
Category: null; Count: 422
Category: 21ddcdc9; Count: 358
Category: 55dd3565; Count: 20
Category: 712d530c; Count: 9
Category: 5b885066; Count: 8
Category: 9437f62f; Count: 7
Category: cf99e5de; Count: 7
Category: 04de9d96; Count: 6
Category: 3014a4b1; Count: 5
Category: 1d1eb838; Count: 5


VARIABLE C20

Unique values within the category: 4

Top 10 categories by count:
Category: null; Count: 422
Category: a458ea53; Count: 209
Category: b1252a9d; Count: 187
Category: 5840adea; Count: 182


VARIABLE C21

Unique values within the category: 724

Top 10 categories by count:
Category: null; Count: 34
Category: 723b4dfd; Count: 28
Category: 0014c32a; Count: 15
Category: e587c466; Count: 13
Category: 73d06dde; Count: 12
Category: 5f957280; Count: 10
Category: dfcfc3fa; Count: 10
Category: c2a93b37; Count: 8
Category: deaf6b52; Count: 7
Category: 0429f84b; Count: 7


VARIABLE C22

Unique values within the category: 6

Top 10 categories by count:
Category: null; Count: 745
Category: ad3062eb; Count: 145
Category: c9d4222a; Count: 94
Category: 8ec974f4; Count: 7
Category: 78e2e389; Count: 6
Category: c0061c6d; Count: 3


VARIABLE C23

Unique values within the category: 12

Top 10 categories by count:
Category: 32c7478e; Count: 447
Category: 3a171ecb; Count: 199
Category: 423fab69; Count: 111
Category: bcdee96c; Count: 81
Category: be7c41b4; Count: 63
Category: c7dc6720; Count: 47
Category: 55dd3565; Count: 20
Category: dbb486d7; Count: 16
Category: c3dc6cef; Count: 7
Category: 93bad2c0; Count: 5


VARIABLE C24

Unique values within the category: 489

Top 10 categories by count:
Category: 3fdb382b; Count: 56
Category: b34f3128; Count: 50
Category: 1793a828; Count: 49
Category: null; Count: 34
Category: 3b183c5c; Count: 31
Category: aee52b6f; Count: 25
Category: 45ab94c8; Count: 22
Category: 9117a34a; Count: 19
Category: df487a73; Count: 16
Category: 8fc66e78; Count: 15


VARIABLE C25

Unique values within the category: 30

Top 10 categories by count:
Category: null; Count: 422
Category: 001f3601; Count: 135
Category: e8b83407; Count: 112
Category: ea9a246c; Count: 84
Category: cb079c2d; Count: 44
Category: f0f449dd; Count: 35
Category: 445bbe3b; Count: 30
Category: 010f6491; Count: 29
Category: 2bf691b1; Count: 28
Category: 9b3e8820; Count: 28


VARIABLE C26

Unique values within the category: 375

Top 10 categories by count:
Category: null; Count: 422
Category: 49d68486; Count: 38
Category: 2fede552; Count: 22
Category: c27f155b; Count: 19
Category: c84c4aec; Count: 15
Category: 984e0db0; Count: 11
Category: aa5f0a15; Count: 11
Category: b7d9c3bc; Count: 9
Category: 6c27a535; Count: 7
Category: 56be3401; Count: 6
# Bar graphs of category counts within each categorical variable by label
fig, ax = plt.subplots(13, 2,figsize=(15,40))
plt.tight_layout()
fno = 0
# axes are in a two-dimensional array, indexed by [row, col]
for i in range(13):
    for j in range(2):
        fno += 1
        col = "C" + str(fno)
        sns.countplot(y=col, hue="Label", data=toy_df, palette="Greens_d",
                  order=toy_df[col].value_counts().iloc[:10].index,ax=ax[i,j]).set_title('Count By C'+str(fno)+' Label')
Drawing

Section 4 - Algorithm Implementation

In this section we build a gradient-boosted decision tree using Spark ML. The spark implementation follows the Friedman algorithm routine previously discussed in section 2. The documentation, along with a link to the API, can be found here: https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier.

A lengthier tutorial for building pipelines and models using Spark can be found here: https://towardsdatascience.com/machine-learning-with-pyspark-and-mllib-solving-a-binary-classification-problem-96396065d2aa. That tutorial borrows from another notebook located here: https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html.

Because Gradient Boost is a sequential model (need tree_1 to compute tree_2) it is not as prone to parallel implementation as random forests are. In the Spark implementation the individual tree building tasks are parallelized. This notion causes the training times in Gradient Boost to exceed those of Random Forest. Parallelizing individual tree building in Spark follows the PLANET model as outlined by Google [Panda, B., Herbach, J., Basu, S., & Bayardo, R. (2009). PLANET: Massively Parallel Learning of Tree Ensembles with MapReduce. Lyon: ACM.]. This technique uses a central controller that provides the tree structure and decides on the optimum split given the information gain from various split points. As detailed in section 2, tree learning is a greedy algorithm in which all split points across all variables must be considered to determine the optimum split (Brieman's technique aids in this greedy approach slightly for unordered variables). It is at this step that parallel processing can help in order to analyze all potential splits and determine the best. Listed below is a detail of the map and reduce phase broken up by ordered or unordered variables.

Controller: The controller lies outside of a map-reduce job and keeps track of the overall tree structure, along with the training set for each node (to compute the split decision) and the size of the splits. The controller proceeds throughout the training set calling map-reduce jobs at each node in order to find the most viable split point

Map Phase: - Ordered lists: From before we know that we need to split at every point along an ordered list in order to find the true optimum split point. Because MapReduce breaks up the input data - it isn't possible to assess where the split point lies. Therefore PLANET simplifies the process slightly by (prior to the distributed job) computing equi-distant histogram for every ordered feature. The splits are then taken at the histogram buckets. While there is a tradeoff using this approach for absolute best split point - the computational cost savings are generally worth it. The map phase then splits at each feature and each value and emits a key, value pair of the form ((node, feature, split point), (sum of y values, sum of squared y values, total length that have value less than s)). - Unordered lists: The map phase outputs key, value pairs of the form ((node, feature), (value, ((sum of y values, sum of squared y values, total length that have value less than s))). The key difference between these two scenarios is that the split value is located in the value for unordered lists and in the key for ordered lists. This falls back to Brieman's technique and requiring unordered lists to be sorted by y value to determine split points. - The aggregated tuple (sum of y values, sum of squared y values, total length that have value less than s) is also output with a key value of node in order for the reducer phase to compare information gain on the various splits

Reduce Phase: - The reducer is split into a series of summations based on the key value format: node, (node, feature), (node, feature, split point). Based on this data it then sums over the aggregated tuple format emitted in all three scenarios in order to determine the optimum split point - The output of the reducer is the best split it has seen for each node. This information along with the average y value and size of left and right branches is emited to the controller to further build the tree structure

Mentioned above, for Gradient Boosted Trees Frieman recommended a technique he called TreeBoost in which each leaf was given a different optimum constant update, rather than an average for the overall tree [Friedman, J. (1999). Greedy Function Approximation: A Gradient Boosting Machine. Stanford: Stanford University.]. In the current Spark GradientBoost algorithm, this approach is not utilized and a single parameter is used for the entire tree structure. Implementing TreeBoost is likely to improve overall model accuracy and is saved for a later step.

Below we utilize the Spark ML library in order to fit a gradient boosted tree on the CLR dataset.

Preprocessing

First, we import the modules we'll need and read the data into a dataframe.

import pyspark.sql.functions

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

ON_CLUSTER = True # flag to determine whether to read from a GCP bucket
if ON_CLUSTER:
    df = spark.read.csv("gs://w61-ucb2/data/train.txt", sep="\t", inferSchema=True)
else:
    df = spark.read.csv("./data/train.txt", sep="\t", inferSchema=True)

    # create a small sample for local work
    df = df.sample(False, .0001, 42)

Next, we need to transform the data into a format suitable for a GBTClassifier. We'll do this by creating a pipeline of transformations.

stages = [] # the stages in the pipeline

We first convert the categorical features into one-hot encoded vectors. However, as we saw in the EDA above, some features have many millions of unique values. It is infeasible to one-hot encode all of these due to memory constraints, and these features would likely make for poor split points in any case. Thus, we'll initially use only the categorical features that have fewer than 32 unique values, which is the default number of maximum bins used by the GBTClassifier.

categoricalColumns = [f"_c{i}" for i in range(14, 40)]
countsByCol = []

# get approximate distinct counts for each of the categorical features
for col in categoricalColumns:
    cnt = df.agg(
        pyspark.sql.functions.approx_count_distinct(df[col]).alias(col)
    ).collect()

    # store the columns name and the count
    countsByCol.append( (col, cnt[0][col]) )
# get features that have fewer than 50 unique values
categoricalColumnsToEncode = [t[0] for t in countsByCol if t[1] < 32]
categoricalColumnsToEncode
['_c19', '_c22', '_c27', '_c30', '_c33', '_c35', '_c36']
# encode the subset of categorical columns
for categoricalCol in categoricalColumnsToEncode:

    # index the categories
    stringIndexer = StringIndexer(
        inputCol = categoricalCol, 
        outputCol = categoricalCol + "Index",
        handleInvalid="keep"
    )

    # convert the indices into binary SparseVectors
    encoder = OneHotEncoderEstimator(
        inputCols=[stringIndexer.getOutputCol()], 
        outputCols=[categoricalCol + "classVec"]
    )

    # add the stage
    stages += [stringIndexer, encoder]

We then combine the categorical and numeric columns into a feature vector.

# extract the encoded categorical features from the dataframe 
# and combine with the continuous features
features = [c + "classVec" for c in categoricalColumnsToEncode] + \
                [f"_c{i}" for i in range(1, 14)]

# create the feature vector, preserving rows that contain missing data.
assembler = VectorAssembler(
    inputCols=features, 
    outputCol="features", 
    handleInvalid="keep"
)

# add the assembler to the stages of the pipeline
stages += [assembler]

Finally, we convert the dataset labels into label indices.

# convert label into indices
labelIndexer = StringIndexer(inputCol="_c0", outputCol="label")
stages += [labelIndexer]

All of the transformations that we need to perform to build a GBT now exist as stages, which we'll next evaluate in a pipeline.

preppedDF = (
    Pipeline()
    .setStages(stages)
    .fit(df)
    .transform(df)
)

Initial GBT model

We'll next perform a single split of the data into training and test sets, fit a GBT to the training data using the default parameters and evaluate the performance on the test data.

# split the data
trainData, testData = preppedDF.randomSplit([0.75, 0.25], seed = 42)

# create and train the classifier
gbtModel = GBTClassifier().fit(trainData)

# make predictions on the test data
predictions = gbtModel.transform(testData)

In order to compare this initial model to future iterations, get the AUROC score.

# get the auroc for the default model
evaluator = BinaryClassificationEvaluator(labelCol="label")
auroc = evaluator.evaluate(predictions, { evaluator.metricName: "areaUnderROC" })
print(f"Test Area Under ROC: {auroc}")
Test Area Under ROC: 0.7073676650780636

We'll also create a function to give a better sense of the model's performance.

def get_counts(predictions):
    """ Get the counts of label and prediction agreement """

    # split the dataframe into correct and incorrect predictions
    correct = predictions.filter(predictions.label == predictions.prediction)
    incorrect = predictions.filter(predictions.label != predictions.prediction)

    # get the various counts needed to calculate metrics
    true_positives_cnt = correct.filter(correct.label == 1).count()
    true_negatives_cnt = correct.filter(correct.label == 0).count()
    false_positives_cnt = incorrect.filter(incorrect.label == 0).count()
    false_negatives_cnt = incorrect.filter(incorrect.label == 1).count()

    return true_positives_cnt, true_negatives_cnt, false_positives_cnt, false_negatives_cnt

def get_scores(predictions):
    """ Get the evaluation scores """

    tp, tn, fp, fn = get_counts(predictions)

    # calculate the metrics 
    accuracy = (tp + tn) / (tp + tn + fp + fn)

    precision = tp / (tp + fp)

    recall = tp / (tp + fn)

    f1_score = 2 * (recall * precision) / (recall + precision)

    return accuracy, precision, recall, f1_score

def print_eval_metrics(predictions):
    """ Print performance metrics.

        Args:
            predictions - a DataFrame with model predictions
    """

    accuracy, precision, recall, f1_score = get_scores(predictions)

    # print the results
    print(f"Test accuracy: {round(accuracy, 3)}")
    print(f"Test precision: {round(precision, 3)}")
    print(f"Test recall: {round(recall, 3)}")
    print(f"Test F1 score: {round(f1_score, 3)}")
print_eval_metrics(predictions)
Test accuracy: 0.76
Test precision: 0.598
Test recall: 0.191
Test F1 score: 0.289
gbtModel.save("gs://w61-ucb2/data/gbtModel")

Further Model Improvements

There are a number of potential issues with out initial model. First, although we used a random split to create the training and test sets, it is always possible with a single split that there will be anomalies that cause the two sets to differ in ways that cause us to misestimate the model's performance on the test data. To address this problem, we will use 3-fold cross validation below in order to evaluate the model using 3 different splits.

Second, it is possible that the default hyperparameters are not the best hyperparameters for this data. In particular, the model above trains 20 trees (maxIter), with a height of 5 or $2^5=32$ maximum leaves (maxDepth), and uses a learning rate of 0.1. Below, we'll experiment with different permutations of these parameters and select the best model.

# specify the parameters over which to search
gbt = GBTClassifier()
paramGrid = (
    ParamGridBuilder()
    .addGrid(gbt.maxIter, [10, 20, 30])
    .addGrid(gbt.maxDepth, [3, 5])
    .build()
)
# setup 3-fold cross-validation for each model
crossVal = CrossValidator(
    estimator=GBTClassifier(), 
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=3
)

Having initialized the objects we need, we train the models and select the one with the highest score.

cvModel = crossVal.fit(trainData)
predictions = cvModel.transform(testData)
print_eval_metrics(predictions)
Test accuracy: 0.761
Test precision: 0.603
Test recall: 0.196
Test F1 score: 0.296
evaluator.evaluate(predictions)
0.710653928067178

We see a slight improvement in the AUROC score ($0.710 - 0.707 = 0.003$), the test accuracy $(0.761 - 0.76 = 0.001)$ and the f1 score ($0.296 - 0.289 = 0.007$) with this model.

We'll now inspect the parameters that correspond to the best model.

bestModel = cvModel.bestModel
param_map = bestModel.extractParamMap()
num_trees = param_map[bestModel.getParam("maxIter")]
max_depth = param_map[bestModel.getParam("maxDepth")]

print(f"Number of trees: {num_trees}")
print(f"Maximum height of tree: {max_depth}")
Number of trees: 20
Maximum height of tree: 5

We see that the maximum depth of the tree is 5 and the number of iterations (i.e. the height of the tree) is 20. The results indicate that adding more trees resulted in overfitting the model for this dataset. The parameters for the best model in fact match those used in the default model above, so the improved scores are actually a result of using cross-validation.

# save the model for future use
cvModel.bestModel.save('gs://w61-ucb2/data/gbtModelBest')

Gradient Boost Results and Discussion

Methods to Implement in Future Work

Gradient boost techniques are a very popular machine learning implementation and have observed very good results in some of the data science competition sites. There has therefore been a number of improvements and iterations on the gradient boost techniques presented here that could aid in the development of a more accurate gradient boost model: - Handling of sparse data / matrix: As shown in the EDA section and as generally accepted with CTR data, many of the features are sparse (empty) or missing (NAN). When building a decision tree, we use an encoding routine to separate categorical features into n_unique features. This causes the feature space to get very large (especially when there are a number of categories as was the case in this dataset (1293 unique values in one feature space)). If a one-hot encoding routine is utilized this causes the feature space to become very sparse (number of zero values) as most datapoints won't contain any data. There is also the problem of training on NAN values which isn't possible. In this example we either: (a) imputed the mean of a numeric vector and used that to fill NAN values (b) assumed the value of zero for all missing data in categorical features. One common method for addressing the sparsity issue is to encode a category into a series of bins rather than one-hot encoding. In this CTR data our categorical features were hash values rather than the original dataset. Because of this, we did not choose to employ a binning technique as we were uncertain of the algorithm to create the hash and whether there was validity in binning values based on their hash value. Given another training set where we had access to the raw value and can bin by value this is a worthy technique. XG-Boost specifically handles sparse data well by performing what the authors entitled 'sparsity-aware split finding'. The basic premise used in XG-boost is to utilize a default direction when encountering NAN or zero values for a particular feature. This default direction is determined by analyzing all missing values for that feature and determining the most appropriate direction [Chen, T., & Guestrin, C. (2016). XGBoost: A Scalable Tree Boosting System. Cornell University.]. - Alternatives to One-Hot-Encoding: We've already mentioned the potential to use binning rather than one-hot encoding. Another popular technique is to use a target statistic - such as the expected value of the output parameter based on the category value or alternatively for the bin. The technique is similar to binning except the value is replaced with a target statistic. There are a number of alternative methods to compute the target statistic that have been utilized to optimize training and prevent target leakage [Prokhorenkova, L., Gusev, G., Vorobev, A., Dorogush, A. V., & Gulin, A. (2019). CatBoost: unbiased boosting with categorical features.]. - TreeBoost: As mentioned prior to implementing gradient boost in Spark. The current library doesn't make use of separate multiplier values for each leaf node as recommended by Friedman. Implementing TreeBoost has been shown to increase the accuracy of gradient boost models - Individual Tree Optimization: Gradient boost relies on a number of weak estimators to create a good prediction in aggregate. There are a number of methods we can employ to allow our weak estimator to more accurately assess each problem and therefore optimize the ensemble: - Depth of the trees: This can lead to overfitting but can also improve individual prediction which can benefit the aggregate model - Minimum number of examples per leaf: Similar to depth of the tree, can lead to overfitting and or model improvement depending on the problem - Shrinkage: In the Spark ML implementation we chose a learning rate that tempered the adjustments made on sequential models fit to the residuals. We can further refine this value to optimize model performance - Regularization: Use L1 and L2 regularization techniques to penalize complex models. This is assessed in the loss functino by adding regularization terms to the end of the loss function corresponding to L1 and L2 regularization: $$L = loss function + \frac{1}{2} \lambda \sum w_j^2 + \alpha \sum |w_j|$$ Where $\lambda$ and $\alpha$ are the L2 and L1 regularization terms respectively.

Comments