comparison between Pandas, Dask, and Koalas

[1]:
%load_ext autoreload
%autoreload 2
[2]:
import copy
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal
from pandas.testing import assert_series_equal
from sklearn.tree import DecisionTreeClassifier
from lightgbm import LGBMClassifier
from lightgbm import DaskLGBMClassifier
import treelite
import treelite_runtime
import dill
import joblib
from sklearn.metrics import make_scorer
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import RandomizedSearchCV
from pyspark.ml.classification import RandomForestClassifier as RFCSpark

Gators imports

[3]:
# data cleaning
from gators.data_cleaning import (
    DropColumns,
    Replace,
)
# imputers
from gators.imputers import (
    NumericsImputer,
    ObjectImputer,
)
# encoders
from gators.encoders import (
    WOEEncoder,
)
# binning
from gators.binning import (
    BinRareCategories,
    BinSingleTargetClassCategories,
    Binning,
    CustomBinning,
    QuantileBinning,
    TreeBinning,
)
# feature generation
from gators.feature_generation import (
    PolynomialFeatures,
    ElementaryArithmetics,
    PolynomialObjectFeatures,
    IsNull,
)
from gators.feature_generation_str import (
    StringContains,
    StringLength,
    Extract,
    SplitExtract,
)
# feature selection
from gators.feature_selection import (
    SelectFromModel,
    InformationValue
)
# model building
from gators.model_building import (
    Model,
    TrainTestSplit,
    XGBBoosterBuilder,
    XGBTreeliteDumper,
)
# pipeline
from gators.pipeline import Pipeline
/Users/cpoli/gators38/lib/python3.8/site-packages/xgboost/compat.py:36: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  from pandas import MultiIndex, Int64Index

pipeline

[4]:
# preprocessing pipeline
steps = [
    ('SplitExtractName', SplitExtract(['Name'], [', '], [1], ['Dummy'])),
    ('SplitExtractTitle', SplitExtract(['Dummy'], ['.'], [0], ['Title'])),
    ('StringLength', StringLength(columns=['Cabin', 'Ticket'])),
    ('DropColumns', DropColumns(['Name', 'Dummy', 'Cabin', 'Ticket'])),
    ('ObjectImputer', ObjectImputer(strategy='constant', value='MISSING')),
    ('BinSingleTargetClassCategories', BinSingleTargetClassCategories()),
    ('NumericsImputer', NumericsImputer(strategy='mean')),
    ('Encoder', WOEEncoder()),
]

Pandas pipeline

[5]:
data = pd.read_parquet('data/titanic.parquet')
data = data.reset_index(drop=True)
y = data['Survived']
X = data.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train, X_test, y_train, y_test = train_test_split.transform(X, y)
[6]:
model = LGBMClassifier(random_state=0)
steps_pd = steps + [('Model', Model(model=model))]

pipe = Pipeline(steps=steps_pd, verbose=False)
_ = pipe.fit(X_train, y_train)
[7]:
# split prod pipeline and model
model_pd = pipe[-1].model
prod_pipe_pd = copy.deepcopy(pipe)
_ = prod_pipe_pd.steps.pop(-1)
[8]:
X_train_prepro_pd = prod_pipe_pd.transform(X_train)
X_test_prepro_pd = prod_pipe_pd.transform(X_test)
y_test_pred_proba_pd = model_pd.predict_proba(X_test_prepro_pd.to_numpy())[:, 1]

Dask pipeline

[9]:
import dask.dataframe as dd
import dask.distributed
client = dask.distributed.Client(n_workers=4)
client
[9]:

Client

Client-e981a354-43e7-11ed-991c-acde48001122

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

[10]:
data_dd = dd.read_parquet('data/titanic.parquet')
data_dd = data_dd.reset_index(drop=True)
y_dd = data_dd['Survived']
X_dd = data_dd.drop('Survived', axis=1)

n_partitions = 4
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_dd, X_test_dd, y_train_dd, y_test_dd = train_test_split.transform(X_dd, y_dd)
X_train_dd = client.persist(X_train_dd.repartition(n_partitions))
X_test_dd = client.persist(X_test_dd.repartition(n_partitions))
y_train_dd = client.persist(y_train_dd.repartition(n_partitions))
y_test_dd = client.persist(y_test_dd.repartition(n_partitions))
[11]:
model_dd = DaskLGBMClassifier(random_state=0, client=client, n_jobs=n_partitions)
steps_dd = steps + [('Model', Model(model=model_dd))]

pipe = Pipeline(steps=steps_dd, verbose=False)
_ = pipe.fit(X_train_dd, y_train_dd)
/Users/cpoli/gators38/lib/python3.8/site-packages/lightgbm/dask.py:525: UserWarning: Parameter n_jobs will be ignored.
  _log_warning(f"Parameter {param_alias} will be ignored.")
Finding random open ports for workers
[LightGBM] [Warning] num_threads is set=4, n_jobs=-1 will be ignored. Current value: num_threads=4
[12]:
# split prod pipeline and model
model_dd = pipe[-1].model
prod_pipe_dd = copy.deepcopy(pipe)
_ = prod_pipe_dd.steps.pop(-1)
[13]:
X_train_prepro_dd = prod_pipe_dd.transform(X_train_dd)
X_test_prepro_dd = prod_pipe_dd.transform(X_test_dd)
y_test_pred_proba_dd = model_dd.predict_proba(X_test_prepro_dd).compute()[:, 1]

Koalas pipeline

[14]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set('spark.executor.memory', '2g')
conf.set('spark.sql.codegen.wholeStage', 'false')
conf.set('spark.sql.autoBroadcastJoinThreshold', -1)
SparkContext(conf=conf)
import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed-sequence')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/10/04 14:24:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[15]:
data_ks = ks.read_parquet('data/titanic.parquet')
data_ks = data_ks.reset_index(drop=True)
y_ks = data_ks['Survived']
X_ks = data_ks.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_ks, X_test_ks, y_train_ks, y_test_ks = train_test_split.transform(X_ks, y_ks)

[16]:
from pyspark.ml.classification import RandomForestClassifier as RFCSpark

model = RFCSpark(numTrees=15, maxDepth=3, labelCol='Survived', seed=0)
steps_ks = steps + [('Model', Model(model=model))]

pipe_ks = Pipeline(steps=steps_ks, verbose=False)
_ = pipe_ks.fit(X_train_ks, y_train_ks)
/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/sql/pandas/functions.py:389: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
  warnings.warn(

[17]:
# split prod pipeline and model
# note that koalas pipelines cannot be pickled
model_ks = pipe_ks[-1]
prod_pipe_ks = pipe_ks
_ = pipe_ks.steps.pop(-1)
[18]:
X_train_prepro_ks = prod_pipe_ks.transform(X_train_ks)
X_test_prepro_ks = prod_pipe_ks.transform(X_test_ks)
/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/sql/pandas/functions.py:389: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
  warnings.warn(
[19]:
y_test_pred_proba_ks = model_ks.predict_proba(X_test_prepro_ks)

check results match

check pandas, dask, and koalas results match - production pipeline

[20]:
assert_frame_equal(
    X_train_prepro_pd,
    X_train_prepro_dd.compute())
assert_frame_equal(
    X_train_prepro_pd,
    X_train_prepro_ks.to_pandas())
assert_frame_equal(
    X_test_prepro_pd,
    X_test_prepro_dd.compute())
assert_frame_equal(
    X_test_prepro_pd,
    X_test_prepro_ks.to_pandas())
assert np.allclose(y_test_pred_proba_dd, y_test_pred_proba_pd)