10 minutes to gators

[1]:
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings('ignore')
[2]:
import copy
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal
from xgboost import XGBClassifier
import treelite
import treelite_runtime
import dill
[3]:
# data cleaning
from gators.data_cleaning import ConvertColumnDatatype
# imputers
from gators.imputers import (
    NumericsImputer,
    ObjectImputer,
)
# encoders
from gators.encoders import WOEEncoder
# pipeline
from gators.pipeline import Pipeline
# model building
from gators.model_building import XGBBoosterBuilder

end-to-end simple worflow

The workflow is as followed:

1- create a pipeline to take care of the data preproprocessing.

2- train the pipeline on a pandas or koalas dataframe.

3- generate the preproccessed data.

4- train a decision tree based model on the preprocessed data.

5- use treelite to compile the model in C.

The pipeline and the compiled model can then be deployed in production.

Notes:

  • koalas and/or pandas are used offline, by means of the fit and transform methods.

  • In production, numpy is used with transform_numpy.

The pipeline will be only composed of the following four transformers:

  • ObjectImputer

  • WOEEncoder

  • NumericsImputer

with pandas

[4]:
columns = ['Sex', 'Age', 'SibSp', 'Parch', 'Cabin', 'Survived']
data = pd.read_parquet('data/titanic.parquet')[columns]
y = data['Survived']
X = data.drop(['Survived'], axis=1)
X.head(2)
[4]:
Sex Age SibSp Parch Cabin
0 male 22.0 1 0 None
1 female 38.0 1 0 C85

pipeline

[5]:
prepro_steps = [
    ('ObjectImputer', ObjectImputer(strategy='constant', value='MISSING')),
    ('WOEEncoder', WOEEncoder(inplace=True)),
    ('ConvertColumnDatatype', ConvertColumnDatatype(columns=['SibSp', 'Parch'], datatype=float)),
    ('NumericsImputer', NumericsImputer(strategy='mean', inplace=True)),
]
pipe = Pipeline(steps=prepro_steps)
X_prepro = pipe.fit_transform(X, y)
X_prepro_np = pipe.transform_numpy(X.to_numpy())
[6]:
X_prepro_np
[6]:
array([[-0.98142372, 22.        ,  1.        ,  0.        , -0.37441799],
       [ 1.52476689, 38.        ,  1.        ,  0.        ,  1.57080012],
       [ 1.52476689, 26.        ,  0.        ,  0.        , -0.37441799],
       ...,
       [ 1.52476689, 29.69911765,  1.        ,  2.        , -0.37441799],
       [-0.98142372, 26.        ,  0.        ,  0.        ,  1.57080012],
       [-0.98142372, 32.        ,  0.        ,  0.        , -0.37441799]])
[7]:
X_prepro.head(2)
[7]:
Sex Age SibSp Parch Cabin
0 -0.981424 22.0 1.0 0.0 -0.374418
1 1.524767 38.0 1.0 0.0 1.570800
check transform and tranform_numpy output
[8]:
X_prepro_np_pd = pd.DataFrame(
    X_prepro_np,
    index=X_prepro.index,
    columns=X_prepro.columns,
)
X_prepro_np_pd.head(2)
[8]:
Sex Age SibSp Parch Cabin
0 -0.981424 22.0 1.0 0.0 -0.374418
1 1.524767 38.0 1.0 0.0 1.570800
[9]:
assert X_prepro.shape == X_prepro_np.shape
X_prepro_np_pd = pd.DataFrame(
    X_prepro_np,
    index=X_prepro.index,
    columns=X_prepro.columns,
)
assert_frame_equal(X_prepro, X_prepro_np_pd)

model building

[10]:
model = XGBClassifier(
    max_depth=2,
    n_estimators=10,
    random_state=0,
    eval_metric='mlogloss',
    use_label_encoder=False)

pandas model

[11]:
model_pd = copy.copy(model)
_ = model_pd.fit(X_prepro, y)

numpy model

[12]:
model = model.fit(X_prepro.to_numpy(), y.to_numpy())

treelite model

[13]:
xgb_booster = XGBBoosterBuilder.train(
    model=model,
    X_train=X_prepro_np,
    y_train=y.to_numpy(),
)

treelite_model = treelite.Model.from_xgboost(xgb_booster)
treelite_model.export_lib(
    toolchain='clang',
    libpath='./models/treelite_simple_xgb.so',
    params={'parallel_comp': 4},
    verbose=True)
model_tl = treelite_runtime.Predictor(
    './models/treelite_simple_xgb.so', verbose=False)
[16:09:44] ../src/compiler/ast_native.cc:45: Using ASTNativeCompiler
[16:09:44] ../src/compiler/ast/split.cc:31: Parallel compilation enabled; member trees will be divided into 4 translation units.
[16:09:44] ../src/c_api/c_api.cc:121: Code generation finished. Writing code to files...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file recipe.json...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file tu3.c...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file tu2.c...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file tu1.c...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file tu0.c...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file header.h...
[16:09:44] ../src/c_api/c_api.cc:126: Writing file main.c...
[16:09:44] /Users/cpoli/gators38/lib/python3.8/site-packages/treelite/contrib/util.py:105: Compiling sources files in directory ./models/tmppmwz3168 into object files (*.o)...
[16:09:44] /Users/cpoli/gators38/lib/python3.8/site-packages/treelite/contrib/util.py:134: Generating dynamic shared library ./models/tmppmwz3168/predictor.dylib...
[16:09:44] /Users/cpoli/gators38/lib/python3.8/site-packages/treelite/contrib/__init__.py:278: Generated shared library in 0.40 seconds
per-sample model benchmarking
[14]:
x = X.iloc[[0]]
x_np = x.to_numpy()
stats_pd = %timeit -o model_pd.predict_proba(pipe.transform(x))[0][1]
stats_pd_tl = %timeit -o model_tl.predict(treelite_runtime.DMatrix(pipe.transform(x).to_numpy()))
stats_np = %timeit -o model.predict_proba(pipe.transform_numpy(x_np.copy()))[0][1]
stats_np_tl = %timeit -o model_tl.predict(treelite_runtime.DMatrix(pipe.transform_numpy(x_np.copy())))
13.7 ms ± 1.13 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
6.85 ms ± 230 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
1.48 ms ± 108 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
67.6 µs ± 2.27 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Overall speed-up

[15]:
speedup = 1e3 * float(str(stats_pd).split(' ')[0]) / float(str(stats_np_tl).split(' ')[0])
f'Speed-up Pandas VS Numpy&Treelite x{round(speedup)}'
[15]:
'Speed-up Pandas VS Numpy&Treelite x203'

check model predictions

[16]:
X_np = X.to_numpy()
y_pred_pd = model_pd.predict_proba(pipe.transform(X))[:, 1]
y_pred_np = model.predict_proba(pipe.transform_numpy(X_np.copy()))[:, 1]
y_pred_tl = model_tl.predict(
    treelite_runtime.DMatrix(
        pipe.transform_numpy(X_np.copy()))
    )
assert np.allclose(y_pred_np, y_pred_pd)
assert np.allclose(y_pred_np, y_pred_tl)

dumping both model and pipeline

[17]:
model_path = 'models/simple_xgb.dill'
with open(model_path, 'wb') as file:
    dill.dump(model, file)
model_path = 'models/simple_pipeline.dill'
with open(model_path, 'wb') as file:
    dill.dump(model, file)

with koalas

[18]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.set('spark.executor.memory', '2g')
conf.set('spark.sql.codegen.wholeStage', 'false')
conf.set('spark.sql.autoBroadcastJoinThreshold', -1)
SparkContext(conf=conf)
import databricks.koalas as ks
ks.set_option('compute.default_index_type', 'distributed-sequence')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/cpoli/gators38/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/10/05 16:10:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[19]:
import databricks.koalas as ks
from gators.converter import ToPandas
[20]:
data = ks.read_parquet('data/titanic.parquet')[columns]
y_ks = data['Survived']
X_ks = data.drop(['Survived', 'PassengerId'], axis=1)
X_ks.head(2)

[20]:
Sex Age SibSp Parch Cabin
0 male 22.0 1 0 None
1 female 38.0 1 0 C85
[21]:
X_prepro_ks = pipe.fit_transform(X_ks, y_ks)
X_prepro_ks_np = pipe.transform_numpy(X.to_numpy())

[22]:
X_prepro_ks_pd, y_ks_pd = ToPandas().transform(X_prepro_ks, y_ks)
X_prepro_ks_pd.index = X_prepro.index
[23]:
assert_frame_equal(X_prepro_ks_pd, X_prepro)
assert np.allclose(X_prepro_ks_np, X_prepro)

Since we have a pandas dataframe, the same steps from the pandas section can now followed

Create you own transformers: example with log10

imports

[24]:
from typing import List, Union
from math import log10
import numpy as np
import pandas as pd
import databricks.koalas as ks
from gators.util import util
from gators.transformers import Transformer

Inplace transformer on the all dataframe

[25]:
class Log10Inplace(Transformer):
    def __init__(self):
        pass

    def fit(self,
            X: Union[pd.DataFrame, ks.DataFrame],
            y: Union[pd.Series, ks.Series] = None) -> 'Log10Inplace':
        self.check_dataframe(X)
        return self

    def transform(
        self, X: Union[pd.DataFrame, ks.DataFrame]
    ) -> Union[pd.DataFrame, ks.DataFrame]:
        self.check_dataframe(X)
        return X.applymap(log10)

    def transform_numpy(self, X: np.ndarray) -> np.ndarray:
        self.check_array(X)
        return np.log10(X)

Notes:

If your use case do not need koalas, the transform method can be replaced by:

return pd.DataFrame(np.log10(X.to_numpy()), columns=X.columns, index=X.index)

which is significantly faster.

[26]:
X = pd.DataFrame(
    np.abs(np.random.randn(10, 10)), columns=list('ABCDEFGHIJ'))
%timeit _ = X.applymap(log10)
%timeit _ = pd.DataFrame(np.log10(X.to_numpy()), columns=X.columns, index=X.index)
1.41 ms ± 30.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
30.1 µs ± 238 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

Inplace transformer on the selected columns

Transform the given columns can be interesting in the following ways:

  • Only a few colums need to be transformed.

  • Only a given datatype should be transformed.

  • The transformation should not be applied on the encoded columns, and the name of the base columns are obtained before the transformation.

[27]:
class Log10ColumnsInplace(Transformer):
    def __init__(self, columns: List[str]):
        if not isinstance(columns, list):
            raise TypeError('`columns` should be a list.')
        if not columns:
            raise ValueError('`columns` should not be empty.')
        self.columns = columns

    def fit(self,
            X: Union[pd.DataFrame, ks.DataFrame],
            y: Union[pd.Series, ks.Series] = None) -> 'Log10Columns':
        self.check_dataframe(X)
        self.idx_columns = util.get_idx_columns(
            columns=X.columns,
            selected_columns=self.columns
        )
        return self

    def transform(
        self, X: Union[pd.DataFrame, ks.DataFrame]
    ) -> Union[pd.DataFrame, ks.DataFrame]:
        self.check_dataframe(X)
        X[self.columns] = X[self.columns].applymap(log10)
        return X

    def transform_numpy(self, X: np.ndarray) -> np.ndarray:
        self.check_array(X)
        X[:, self.idx_columns] = np.log10(X[:, self.idx_columns])
        return X

transformer creating new columns

Creating new columns can be interesting if

  • the raw data are needed for other transformations.

  • the raw data still contains some meaningful predictive information.

[28]:
class Log10Columns(Transformer):
    def __init__(self, columns: List[str]):
        if not isinstance(columns, list):
            raise TypeError('`columns` should be a list.')
        if not columns:
            raise ValueError('`columns` should not be empty.')
        self.columns = columns
        self.column_names = [f'{c}__log10' for c in self.columns]

    def fit(self,
            X: Union[pd.DataFrame, ks.DataFrame],
            y: Union[pd.Series, ks.Series] = None) -> 'Log10Columns':
        self.check_dataframe(X)
        self.idx_columns = util.get_idx_columns(
            columns=X.columns,
            selected_columns=self.columns
        )
        return self

    def transform(
        self, X: Union[pd.DataFrame, ks.DataFrame]
    ) -> Union[pd.DataFrame, ks.DataFrame]:
        self.check_dataframe(X)
        X_new = X[self.columns].applymap(log10)
        X_new.columns = self.column_names
        return X.join(X_new)

    def transform_numpy(self, X: np.ndarray) -> np.ndarray:
        self.check_array(X)
        X_new = np.log10(X[:, self.idx_columns])
        return np.concatenate((X, X_new), axis=1)

tests

[29]:
X = pd.DataFrame(
    np.abs(np.random.randn(10, 10)), columns=list('ABCDEFGHIJ'))
X_np = X.to_numpy()
[30]:
columns = util.get_datatype_columns(X, float)
X_new_inplace_all = Log10Inplace().fit_transform(X.copy())
[31]:
columns = util.get_datatype_columns(X, float)
X_new_inplace_cols = Log10ColumnsInplace(
    columns=columns).fit_transform(X.copy())
[32]:
columns = util.get_datatype_columns(X, float)
X_new = Log10Columns(columns=columns).fit_transform(X)
[33]:
assert np.allclose(
    X_new_inplace_all.to_numpy(), X_new_inplace_cols.to_numpy())
cols = [
    'A__log10', 'B__log10', 'C__log10', 'D__log10', 'E__log10',
    'F__log10', 'G__log10', 'H__log10', 'I__log10', 'J__log10'
]
assert np.allclose(
    X_new_inplace_all.to_numpy(), X_new[cols].to_numpy())

per-sample benchmarking

[34]:
np.random.seed(0)
n_cols = 1000
X = pd.DataFrame(
    np.abs(np.random.randn(1, n_cols)),
    columns=[f'col{i}'for i in range(n_cols)])
X_np = X.to_numpy()
x = X.iloc[[0]]
x_np = x.to_numpy()
[35]:
columns = list(X.columns)
obj = Log10ColumnsInplace(columns=columns)
_ = obj.fit(X)
[36]:
# result saved to be compared with the transform_numpy using Cython.
x_np_new = obj.transform_numpy(x_np.copy())
[37]:
%timeit _ = obj.transform(x.copy())
%timeit _ = obj.transform_numpy(x_np.copy())
234 ms ± 4.38 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
16.1 µs ± 108 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

Notes:

Since the transformation happens inplace, the .copy() is neccessary however, the .copy() runtime is negligeable:

[38]:
%timeit x.copy()
%timeit x_np.copy()
18.4 µs ± 876 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
500 ns ± 16 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
[39]:
columns = list(X.columns)
obj = Log10Columns(columns=columns)
_ = obj.fit(X)
[40]:
%timeit _ = obj.transform(x.copy())
stats_log_np = %timeit -o obj.transform_numpy(x_np.copy())
109 ms ± 7.99 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
14.7 µs ± 1.71 µs per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

Cython

The per-sample runtime of the transform_numpy is already pretty good. But, it some cases, Cython will allow to get even faster.

[41]:
%load_ext Cython
[42]:
%%cython
import cython
import numpy as np
cimport numpy as np
from libc.math cimport log10


@cython.boundscheck(False)
@cython.wraparound(False)
cpdef np.ndarray[np.float64_t, ndim=2] cython_log10(
        np.ndarray[np.float_t, ndim=2] X,
        np.ndarray[np.int64_t, ndim=1] idx_columns,
):
    cdef int i
    cdef int j
    cdef int n_rows = X.shape[0]
    cdef int n_cols = X.shape[1]
    with nogil:
        for i in range(n_rows):
            for j in range(n_cols):
                X[i, j] = log10(X[i, j])
    return X
[43]:
class Log10ColumnsInplaceWithCython(Transformer):
    def __init__(self, columns: List[str]):
        if not isinstance(columns, list):
            raise TypeError('`columns` should be a list.')
        if not columns:
            raise ValueError('`columns` should not be empty.')
        self.columns = columns

    def fit(self, X, y=None) -> 'Log10Columns':
        self.check_dataframe(X)
        self.idx_columns = util.get_idx_columns(
            columns=X.columns,
            selected_columns=self.columns
        )
        return self

    def transform(self, X):
        self.check_dataframe(X)
        X[self.columns] = X[self.columns].applymap(log10)
        return X

    def transform_numpy(self, X):
        self.check_array(X)
        X[:, self.idx_columns] = cython_log10(X, self.idx_columns)
        return X
[45]:
columns = list(X.columns)
obj = Log10ColumnsInplaceWithCython(columns=columns)
_ = obj.fit(X)
[46]:
assert np.allclose(obj.transform_numpy(x_np.copy()), x_np_new)
[47]:
stats_log_cy = %timeit -o obj.transform_numpy(x_np.copy())
9.95 µs ± 284 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
[48]:
speedup = float(str(stats_log_np).split(' ')[0]) / float(str(stats_log_cy).split(' ')[0])
f'Speed-up Cython VS Numpy x{round(speedup, 2)}'
[48]:
'Speed-up Cython VS Numpy x1.48'

A slight runtime improvement is obtained for this transformer.

Notes:

In some cases, for example the Encoders, Cython leads to a significant runtime improvement.