comparison between Pandas, Dask, and Koalas¶
[1]:
%load_ext autoreload
%autoreload 2
[2]:
import copy
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal
from pandas.testing import assert_series_equal
from sklearn.tree import DecisionTreeClassifier
from lightgbm import LGBMClassifier
from lightgbm import DaskLGBMClassifier
import treelite
import treelite_runtime
import dill
import joblib
from sklearn.metrics import make_scorer
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import RandomizedSearchCV
from pyspark.ml.classification import RandomForestClassifier as RFCSpark
Gators imports
[3]:
# data cleaning
from gators.data_cleaning import (
DropColumns,
Replace,
)
# imputers
from gators.imputers import (
NumericsImputer,
ObjectImputer,
)
# encoders
from gators.encoders import (
WOEEncoder,
)
# binning
from gators.binning import (
BinRareCategories,
BinSingleTargetClassCategories,
Binning,
CustomBinning,
QuantileBinning,
TreeBinning,
)
# feature generation
from gators.feature_generation import (
PolynomialFeatures,
ElementaryArithmetics,
PolynomialObjectFeatures,
IsNull,
)
from gators.feature_generation_str import (
StringContains,
StringLength,
Extract,
SplitExtract,
)
# feature selection
from gators.feature_selection import (
SelectFromModel,
InformationValue
)
# model building
from gators.model_building import (
Model,
TrainTestSplit,
XGBBoosterBuilder,
XGBTreeliteDumper,
)
# pipeline
from gators.pipeline import Pipeline
/Users/cpoli/gators38/lib/python3.8/site-packages/xgboost/compat.py:36: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
from pandas import MultiIndex, Int64Index
pipeline¶
[4]:
# preprocessing pipeline
steps = [
('SplitExtractName', SplitExtract(['Name'], [', '], [1], ['Dummy'])),
('SplitExtractTitle', SplitExtract(['Dummy'], ['.'], [0], ['Title'])),
('StringLength', StringLength(columns=['Cabin', 'Ticket'])),
('DropColumns', DropColumns(['Name', 'Dummy', 'Cabin', 'Ticket'])),
('ObjectImputer', ObjectImputer(strategy='constant', value='MISSING')),
('BinSingleTargetClassCategories', BinSingleTargetClassCategories()),
('NumericsImputer', NumericsImputer(strategy='mean')),
('Encoder', WOEEncoder()),
]
Pandas pipeline¶
[5]:
data = pd.read_parquet('data/titanic.parquet')
data = data.reset_index(drop=True)
y = data['Survived']
X = data.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train, X_test, y_train, y_test = train_test_split.transform(X, y)
[6]:
model = LGBMClassifier(random_state=0)
steps_pd = steps + [('Model', Model(model=model))]
pipe = Pipeline(steps=steps_pd, verbose=False)
_ = pipe.fit(X_train, y_train)
[7]:
# split prod pipeline and model
model_pd = pipe[-1].model
prod_pipe_pd = copy.deepcopy(pipe)
_ = prod_pipe_pd.steps.pop(-1)
[8]:
X_train_prepro_pd = prod_pipe_pd.transform(X_train)
X_test_prepro_pd = prod_pipe_pd.transform(X_test)
y_test_pred_proba_pd = model_pd.predict_proba(X_test_prepro_pd.to_numpy())[:, 1]
Dask pipeline¶
[9]:
import dask.dataframe as dd
import dask.distributed
client = dask.distributed.Client(n_workers=4)
client
[9]:
Client
Client-e981a354-43e7-11ed-991c-acde48001122
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
c57805ca
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 16 | Total memory: 64.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d3f20527-56a6-4489-8f40-52feca9361c1
Comm: tcp://127.0.0.1:63343 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 16 |
Started: Just now | Total memory: 64.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:63366 | Total threads: 4 |
Dashboard: http://127.0.0.1:63367/status | Memory: 16.00 GiB |
Nanny: tcp://127.0.0.1:63348 | |
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-o8uw6nv6 |
Worker: 1
Comm: tcp://127.0.0.1:63363 | Total threads: 4 |
Dashboard: http://127.0.0.1:63364/status | Memory: 16.00 GiB |
Nanny: tcp://127.0.0.1:63349 | |
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-tqgoum22 |
Worker: 2
Comm: tcp://127.0.0.1:63357 | Total threads: 4 |
Dashboard: http://127.0.0.1:63359/status | Memory: 16.00 GiB |
Nanny: tcp://127.0.0.1:63347 | |
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-metkvmd8 |
Worker: 3
Comm: tcp://127.0.0.1:63358 | Total threads: 4 |
Dashboard: http://127.0.0.1:63361/status | Memory: 16.00 GiB |
Nanny: tcp://127.0.0.1:63346 | |
Local directory: /Users/cpoli/opensource/gators/examples/dask-worker-space/worker-pfd4nf4e |
[10]:
data_dd = dd.read_parquet('data/titanic.parquet')
data_dd = data_dd.reset_index(drop=True)
y_dd = data_dd['Survived']
X_dd = data_dd.drop('Survived', axis=1)
n_partitions = 4
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_dd, X_test_dd, y_train_dd, y_test_dd = train_test_split.transform(X_dd, y_dd)
X_train_dd = client.persist(X_train_dd.repartition(n_partitions))
X_test_dd = client.persist(X_test_dd.repartition(n_partitions))
y_train_dd = client.persist(y_train_dd.repartition(n_partitions))
y_test_dd = client.persist(y_test_dd.repartition(n_partitions))
[11]:
model_dd = DaskLGBMClassifier(random_state=0, client=client, n_jobs=n_partitions)
steps_dd = steps + [('Model', Model(model=model_dd))]
pipe = Pipeline(steps=steps_dd, verbose=False)
_ = pipe.fit(X_train_dd, y_train_dd)
/Users/cpoli/gators38/lib/python3.8/site-packages/lightgbm/dask.py:525: UserWarning: Parameter n_jobs will be ignored.
_log_warning(f"Parameter {param_alias} will be ignored.")
Finding random open ports for workers
[LightGBM] [Warning] num_threads is set=4, n_jobs=-1 will be ignored. Current value: num_threads=4
[12]:
# split prod pipeline and model
model_dd = pipe[-1].model
prod_pipe_dd = copy.deepcopy(pipe)
_ = prod_pipe_dd.steps.pop(-1)
[13]:
X_train_prepro_dd = prod_pipe_dd.transform(X_train_dd)
X_test_prepro_dd = prod_pipe_dd.transform(X_test_dd)
y_test_pred_proba_dd = model_dd.predict_proba(X_test_prepro_dd).compute()[:, 1]
Koalas pipeline¶
[14]:
from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
conf.set('spark.sql.codegen.wholeStage', 'false')
conf.set('spark.sql.autoBroadcastJoinThreshold', -1)
SparkContext(conf=conf)
import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed-sequence')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/10/04 14:24:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[15]:
data_ks = ks.read_parquet('data/titanic.parquet')
data_ks = data_ks.reset_index(drop=True)
y_ks = data_ks['Survived']
X_ks = data_ks.drop('Survived', axis=1)
train_test_split = TrainTestSplit(test_ratio=0.3, strategy='ordered')
X_train_ks, X_test_ks, y_train_ks, y_test_ks = train_test_split.transform(X_ks, y_ks)
[16]:
from pyspark.ml.classification import RandomForestClassifier as RFCSpark
model = RFCSpark(numTrees=15, maxDepth=3, labelCol='Survived', seed=0)
steps_ks = steps + [('Model', Model(model=model))]
pipe_ks = Pipeline(steps=steps_ks, verbose=False)
_ = pipe_ks.fit(X_train_ks, y_train_ks)
/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/sql/pandas/functions.py:389: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
warnings.warn(
[17]:
# split prod pipeline and model
# note that koalas pipelines cannot be pickled
model_ks = pipe_ks[-1]
prod_pipe_ks = pipe_ks
_ = pipe_ks.steps.pop(-1)
[18]:
X_train_prepro_ks = prod_pipe_ks.transform(X_train_ks)
X_test_prepro_ks = prod_pipe_ks.transform(X_test_ks)
/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/sql/pandas/functions.py:389: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
warnings.warn(
[19]:
y_test_pred_proba_ks = model_ks.predict_proba(X_test_prepro_ks)
check results match¶
check pandas, dask, and koalas results match - production pipeline¶
[20]:
assert_frame_equal(
X_train_prepro_pd,
X_train_prepro_dd.compute())
assert_frame_equal(
X_train_prepro_pd,
X_train_prepro_ks.to_pandas())
assert_frame_equal(
X_test_prepro_pd,
X_test_prepro_dd.compute())
assert_frame_equal(
X_test_prepro_pd,
X_test_prepro_ks.to_pandas())
assert np.allclose(y_test_pred_proba_dd, y_test_pred_proba_pd)