Source code for kaggler.model.automl

"""
This code is based on the solution of the team AvengersEnsmbl at
the KDDCup 2019 AutoML track (https://github.com/jeongyoonlee/kddcup2019track2)

Details and winners' solutions at the competition are available at
the competition website (https://www.4paradigm.com/competition/kddcup2019).
"""

import hyperopt
from hyperopt import STATUS_OK, Trials, hp, space_eval, tpe
import lightgbm as lgb
from logging import getLogger
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from xgboost import XGBModel

from ..const import RANDOM_SEED


logger = getLogger(__name__)
SAMPLE_SIZE = 10000
VALID_SIZE = .2


def sample_data(X, y, nrows, shuffle=True, random_state=None):
    """Sample data by rows.

    Args:
        X (pandas.DataFrame): features
        y (pandas.Series): labels
        nrows (int): the number rows to be sampled
        shuffle (bool): whether to shuffle the data before sampling or not
        random_state (None, int, or numpy.random.RandomState): random seed or a RandomState instance

    Returns:
        a tuple of:

          - X_s (pandas.DataFrame): sampled features
          - y_s (pandas.Series): sampled labels
    """
    if X.shape[0] > nrows:
        if shuffle:
            X_s = X.sample(nrows, random_state=random_state).copy()
            y_s = y[X_s.index].copy()
        else:
            X_s = X.iloc[-nrows:].copy()
            y_s = y.iloc[-nrows:].copy()
    else:
        X_s = X.copy()
        y_s = y.copy()

    return X_s, y_s


[docs]class BaseAutoML(object): """Base optimized regressor class.""" def __init__(self, params, space, n_est=500, n_stop=10, sample_size=SAMPLE_SIZE, valid_size=VALID_SIZE, shuffle=True, feature_selection=True, n_fs=10, fs_th=0., fs_pct=.0, hyperparam_opt=True, n_hpopt=100, minimize=True, n_random_col=10, random_state=RANDOM_SEED): """Initialize an optimized regressor class object. Args: params (dict): default parameters for a regressor space (dict): parameter space for hyperopt to explore n_est (int): the number of iterations for a regressor n_stop (int): early stopping rounds for a regressor sample_size (int): the number of samples for feature selection and parameter search valid_size (float): the fraction of samples for feature selection and/or hyperparameter tuning shuffle (bool): if true, it uses random sampling for sampling and training/validation split. Otherwise last sample_size and valid_size will be used. feature_selection (bool): whether to select features n_fs (int): the number of iterations for feature selection fs_th (float): the feature importance threshold. Features with importances higher than it will be selected. fs_pct (float): the feature importance percentile. Features with importances higher than bottom x% of ranom features hyperparam_opt (bool): whether to search optimal parameters n_hpopt (int): the number of iterations for hyper-parameter optimization minimize (bool): whether the lower the metric is the better n_random_col (int): the number of random columns to added for feature selection random_state (None, int, or numpy.random.RandomState): random seed or a RandomState instance """ self.params = params self.space = space for param in [p for p in params if p in self.space]: del self.space[param] self.n_est = n_est self.n_stop = n_stop self.n_fs = n_fs self.n_hpopt = n_hpopt self.sample_size = sample_size self.valid_size = valid_size self.shuffle = True self.feature_selection = feature_selection self.fs_th = fs_th self.fs_pct = fs_pct self.hyperparam_opt = hyperparam_opt if minimize: self.loss_sign = 1 else: self.loss_sign = -1 self.n_random_col = n_random_col if random_state is None or isinstance(random_state, int): self.random_state = np.random.RandomState(random_state) elif isinstance(random_state, np.random.RandomState): self.random_state = random_state else: raise ValueError('Invalid input for random_state: {}'.format(random_state)) self.n_best = -1 self.model = None self.features = []
[docs] def tune(self, X, y): """Tune the regressor with feature selection and parameter search. Args: X (pandas.DataFrame): features y (pandas.Series): labels Returns: self """ if self.feature_selection or self.hyperparam_opt: X_s, y_s = sample_data(X, y, self.sample_size, shuffle=self.shuffle, random_state=self.random_state) if self.feature_selection: self.features = self.select_features(X_s, y_s) logger.info(f'selecting top {len(self.features)} out of {X.shape[1]} features') else: self.features = X.columns.tolist() if self.hyperparam_opt: logger.info('hyper-parameter tuning') hyperparams, trials = self.optimize_hyperparam(X_s[self.features].values, y_s.values, n_eval=self.n_hpopt) self.params.update(hyperparams) self.n_best = trials.best_trial['result']['model'].best_iteration logger.info(f'best parameters: {self.params}') logger.info(f'best iterations: {self.n_best}') return self
@staticmethod def get_feature_importance(model): raise NotImplementedError def feature_importance(self): raise NotImplementedError
[docs] def select_features(self, X, y): """Select features based on feature importances. It adds self.n_random_col random columns to features and trains the regressor for n_eval rounds. The features ranked higher than the average rank of random columns in the best model are selected. Args: X (pandas.DataFrame): features y (pandas.Series): labels Returns: (list of str): the list of selected features """ random_cols = [] # trying for all features for i in range(1, self.n_random_col + 1): random_col = '__random_{}__'.format(i) X[random_col] = self.random_state.rand(X.shape[0]) random_cols.append(random_col) _, trials = self.optimize_hyperparam(X.values, y.values, n_eval=self.n_fs) feature_importances = self.get_feature_importance(trials.best_trial['result']['model']) imp = pd.DataFrame({'feature_importances': feature_importances, 'feature_names': X.columns.tolist()}) imp = imp.sort_values('feature_importances', ascending=False).drop_duplicates() if len(random_cols) == 0: imp = imp[imp['feature_importances'] > self.fs_th] else: imp_random = imp.loc[imp.feature_names.isin(random_cols), 'feature_importances'].values th = max(np.percentile(imp_random, self.fs_pct * 100), self.fs_th) logger.debug(f'feature importance (th={th:.2f}):\n{imp}') imp = imp[(imp.feature_importances > th) & ~(imp.feature_names.isin(random_cols))] return imp['feature_names'].tolist()
def optimize_hyperparam(self, X, y, test_size=.2, n_eval=100): raise NotImplementedError
[docs]class AutoXGB(BaseAutoML): params = {'random_state': RANDOM_SEED, 'n_jobs': -1} space = { "learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)), "max_depth": hp.choice("num_leaves", [6, 8, 10]), "colsample_bytree": hp.quniform("colsample_bytree", .5, .9, 0.1), "subsample": hp.quniform("subsample", .5, .9, 0.1), "min_child_weight": hp.choice('min_child_weight', [10, 25, 100]), } def __init__(self, objective='reg:linear', metric='rmse', boosting='gbtree', params=params, space=space, n_est=500, n_stop=10, sample_size=SAMPLE_SIZE, feature_selection=True, n_fs=10, fs_th=1e-5, fs_pct=.1, hyperparam_opt=True, n_hpopt=100, n_random_col=10, random_state=RANDOM_SEED, shuffle=True): self.metric, minimize = self._get_metric_alias_minimize(metric) self.params.update(params) self.params.update({'objective': objective, 'booster': boosting}) super(AutoXGB, self).__init__(params=self.params, space=space, n_est=n_est, n_stop=n_stop, sample_size=sample_size, feature_selection=feature_selection, n_fs=n_fs, fs_th=fs_th, fs_pct=fs_pct, hyperparam_opt=hyperparam_opt, n_hpopt=n_hpopt, minimize=minimize, n_random_col=n_random_col, random_state=random_state, shuffle=shuffle) @staticmethod def _get_metric_alias_minimize(metric): """Get XGBoost metric alias. As defined at https://xgboost.readthedocs.io/en/latest/parameter.html#learning-task-parameters Args: metric (str): a metric name Returns: (tuple): - (str): the standard metric name for LightGBM - (bool): a flag whether to minimize or maximize the metric """ assert metric in ['rmse', 'rmsle', 'mae', 'logloss', 'error', 'merror', 'mlogloss', 'auc', 'aucpr', 'ndcg', 'map', 'poisson-nloglik', 'gamma-nloglik', 'cox-nloglik', 'gamma-deviance', 'tweedie-nloglik'], 'Invalid metric: {}'.format(metric) if metric in ['auc', 'aucpr', 'ndcg', 'map']: minimize = False else: minimize = True return metric, minimize @staticmethod def get_feature_importance(model): return model.feature_importances_ def feature_importance(self): return self.model.feature_importances_ def optimize_hyperparam(self, X, y, test_size=.2, n_eval=100): X_trn, X_val, y_trn, y_val = train_test_split(X, y, test_size=test_size, shuffle=self.shuffle) def objective(hyperparams): model = XGBModel(n_estimators=self.n_est, **self.params, **hyperparams) model.fit(X=X_trn, y=y_trn, eval_set=[(X_val, y_val)], eval_metric=self.metric, early_stopping_rounds=self.n_stop, verbose=False) score = model.evals_result()['validation_0'][self.metric][model.best_iteration] * self.loss_sign return {'loss': score, 'status': STATUS_OK, 'model': model} trials = Trials() best = hyperopt.fmin(fn=objective, space=self.space, trials=trials, algo=tpe.suggest, max_evals=n_eval, verbose=1, rstate=self.random_state) hyperparams = space_eval(self.space, best) return hyperparams, trials def fit(self, X, y): self.model = XGBModel(n_estimators=self.n_best, **self.params) self.model.fit(X=X[self.features], y=y, eval_metric='mae', verbose=False) return self def predict(self, X): return self.model.predict(X[self.features])
[docs]class AutoLGB(BaseAutoML): params = { "bagging_freq": 1, "verbosity": -1, "seed": RANDOM_SEED, "num_threads": -1, "feature_pre_filter": False, } space = { "learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)), "num_leaves": hp.choice("num_leaves", [15, 31, 63, 127, 255]), "max_depth": hp.choice("max_depth", [-1, 4, 6, 8, 10]), "feature_fraction": hp.quniform("feature_fraction", .5, .9, 0.1), "bagging_fraction": hp.quniform("bagging_fraction", .5, .9, 0.1), "min_child_samples": hp.choice('min_child_samples', [10, 25, 100]), "lambda_l1": hp.choice('lambda_l1', [0, .1, 1, 10]), "lambda_l2": hp.choice('lambda_l2', [0, .1, 1, 10]), } def __init__(self, objective='regression', metric='mae', boosting='gbdt', params=params, space=space, n_est=500, n_stop=10, sample_size=SAMPLE_SIZE, feature_selection=True, n_fs=10, fs_th=1e-5, fs_pct=.1, hyperparam_opt=True, n_hpopt=100, n_random_col=10, random_state=RANDOM_SEED, shuffle=True): self.metric, minimize = self._get_metric_alias_minimize(metric) self.params.update(params) self.params.update({'objective': objective, 'metric': self.metric, 'boosting': boosting}) super(AutoLGB, self).__init__(params=self.params, space=space, n_est=n_est, n_stop=n_stop, sample_size=sample_size, feature_selection=feature_selection, n_fs=n_fs, fs_th=fs_th, fs_pct=fs_pct, hyperparam_opt=hyperparam_opt, n_hpopt=n_hpopt, minimize=minimize, n_random_col=n_random_col, random_state=random_state, shuffle=shuffle) @staticmethod def _get_metric_alias_minimize(metric): """Get LightGBM metric alias. As defined at https://lightgbm.readthedocs.io/en/latest/Parameters.html Args: metric (str): a metric name Returns: (tuple): - (str): the standard metric name for LightGBM - (bool): a flag whether to minimize or maximize the metric """ if metric in ['l1', 'l2', 'rmse', 'quantile', 'mape', 'huber', 'fair', 'poisson', 'gamma', 'gamma_deviance', 'tweedie', 'ndcg', 'map', 'auc', 'binary_logloss', 'binary_error', 'multi_logloss', 'multi_error', 'cross_entropy', 'cross_entropy_lambda', 'kullerback_leibler']: pass elif metric in ['mae', 'mean_absolute_error', 'regression_l1']: metric = 'l1' elif metric in ['mean_squared_error', 'mse', 'regression_l2', 'regression']: metric = 'l2' elif metric in ['root_mean_squared_error', 'l2_root']: metric = 'rmse' elif metric in ['mean_absolute_percentage_error']: metric = 'mape' elif metric in ['lamdarank']: metric = 'ndcg' elif metric in ['mean_average_precision']: metric = 'map' elif metric in ['binary']: metric = 'binary_logloss' elif metric in ['multiclass', 'softmax', 'multiclassova', 'multiclass_ova', 'ova', 'ovr']: metric = 'multi_logloss' elif metric in ['xentropy']: metric = 'cross_entropy' elif metric in ['xentlambda']: metric = 'cross_entropy_lambda' elif metric in ['kldiv']: metric = 'kullback_leibler' else: raise ValueError('{} is not a valid metric. See https://lightgbm.readthedocs.io/en/latest/Parameters.html ' 'for the full list of metrics available.'.format(metric)) if metric in ['auc', 'ndcg', 'map']: minimize = False else: minimize = True return metric, minimize @staticmethod def get_feature_importance(model): return model.feature_importance(importance_type='gain') def feature_importance(self): return self.model.feature_importance(importance_type='gain') def optimize_hyperparam(self, X, y, test_size=.2, n_eval=100): X_trn, X_val, y_trn, y_val = train_test_split(X, y, test_size=test_size, shuffle=self.shuffle) train_data = lgb.Dataset(X_trn, label=y_trn) valid_data = lgb.Dataset(X_val, label=y_val) def objective(hyperparams): model = lgb.train({**self.params, **hyperparams}, train_data, self.n_est, valid_data, early_stopping_rounds=self.n_stop, verbose_eval=0) score = model.best_score["valid_0"][self.metric] * self.loss_sign return {'loss': score, 'status': STATUS_OK, 'model': model} trials = Trials() best = hyperopt.fmin(fn=objective, space=self.space, trials=trials, algo=tpe.suggest, max_evals=n_eval, verbose=1, rstate=self.random_state) hyperparams = space_eval(self.space, best) return hyperparams, trials def fit(self, X, y): train_data = lgb.Dataset(X[self.features], label=y) self.model = lgb.train(self.params, train_data, self.n_best, verbose_eval=100) return self def predict(self, X): return self.model.predict(X[self.features], num_iteration=self.n_best)