import gc
from typing import Dict, Tuple, List
import warnings
import inspect
import cloudpickle
from joblib import Parallel, delayed
import pandas as pd
from toolz import compose
from toolz.curried import assoc, curry, dissoc, first, map, partial, pipe
from toolz.functoolz import identity
from fklearn.types import EvalFnType, LearnerFnType, LogType
from fklearn.types import SplitterFnType, ValidatorReturnType, PerturbFnType
[docs]def validator_iteration(data: pd.DataFrame,
train_index: pd.Index,
test_indexes: pd.Index,
fold_num: int,
train_fn: LearnerFnType,
eval_fn: EvalFnType,
predict_oof: bool = False) -> LogType:
"""
Perform an iteration of train test split, training and evaluation.
Parameters
----------
data : pandas.DataFrame
A Pandas' DataFrame with training and testing subsets
train_index : numpy.Array
The index of the training subset of `data`.
test_indexes : list of numpy.Array
A list of indexes of the testing subsets of `data`.
fold_num : int
The number of the fold in the current iteration
train_fn : function pandas.DataFrame -> prediction_function, predictions_dataset, logs
A partially defined learning function that takes a training set and
returns a predict function, a dataset with training predictions and training
logs.
eval_fn : function pandas.DataFrame -> dict
A partially defined evaluation function that takes a dataset with prediction and
returns the evaluation logs.
predict_oof : bool
Whether to return out of fold predictions on the logs
Returns
----------
A log-like dictionary evaluations.
"""
train_data = data.iloc[train_index]
empty_set_warn = "Splitter on validator_iteration in generating an empty training dataset. train_data.shape is %s" \
% str(train_data.shape)
warnings.warn(empty_set_warn) if train_data.shape[0] == 0 else None # type: ignore
predict_fn, train_out, train_log = train_fn(train_data)
eval_results = []
oof_predictions = []
for test_index in test_indexes:
test_predictions = predict_fn(data.iloc[test_index])
eval_results.append(eval_fn(test_predictions))
if predict_oof:
oof_predictions.append(test_predictions)
logs = {'fold_num': fold_num,
'train_log': train_log,
'eval_results': eval_results}
return assoc(logs, "oof_predictions", oof_predictions) if predict_oof else logs
[docs]@curry
def validator(train_data: pd.DataFrame,
split_fn: SplitterFnType,
train_fn: LearnerFnType,
eval_fn: EvalFnType,
perturb_fn_train: PerturbFnType = identity,
perturb_fn_test: PerturbFnType = identity,
predict_oof: bool = False) -> ValidatorReturnType:
"""
Splits the training data into folds given by the split function and
performs a train-evaluation sequence on each fold by calling
``validator_iteration``.
Parameters
----------
train_data : pandas.DataFrame
A Pandas' DataFrame with training data
split_fn : function pandas.DataFrame -> list of tuple
Partially defined split function that takes a dataset and returns
a list of folds. Each fold is a Tuple of arrays. The fist array in
each tuple contains training indexes while the second array
contains validation indexes.
train_fn : function pandas.DataFrame -> prediction_function, predictions_dataset, logs
A partially defined learning function that takes a training set and
returns a predict function, a dataset with training predictions and training
logs.
eval_fn : function pandas.DataFrame -> dict
A partially defined evaluation function that takes a dataset with prediction and
returns the evaluation logs.
perturb_fn_train : PerturbFnType
A partially defined corruption function that takes a dataset and returns
a corrupted dataset. Perturbation applied at train-time.
perturb_fn_test : PerturbFnType
A partially defined corruption function that takes a dataset and returns
a corrupted dataset. Perturbation applied at test-time.
predict_oof : bool
Whether to return out of fold predictions on the logs
Returns
----------
A list of log-like dictionary evaluations.
"""
folds, logs = split_fn(train_data)
train_fn = compose(train_fn, perturb_fn_train)
eval_fn = compose(eval_fn, perturb_fn_test)
def fold_iter(fold: Tuple[int, Tuple[pd.Index, pd.Index]]) -> LogType:
(fold_num, (train_index, test_indexes)) = fold
return validator_iteration(train_data, train_index, test_indexes, fold_num, train_fn, eval_fn, predict_oof)
zipped_logs = pipe(folds,
enumerate,
map(fold_iter),
partial(zip, logs))
def _join_split_log(log_tuple: Tuple[LogType, LogType]) -> Tuple[LogType, LogType]:
train_log = {}
split_log, validator_log = log_tuple
train_log["train_log"] = validator_log["train_log"]
return train_log, assoc(dissoc(validator_log, "train_log"), "split_log", split_log)
def get_perturbed_columns(perturbator: PerturbFnType) -> List[str]:
args = inspect.getfullargspec(perturbator).kwonlydefaults
return args['cols'] if args else []
train_logs, validator_logs = zip(*map(_join_split_log, zipped_logs))
first_train_log = first(train_logs)
perturbator_log = {'perturbated_train': [], 'perturbated_test': []} # type: LogType
if perturb_fn_train != identity:
perturbator_log['perturbated_train'] = get_perturbed_columns(perturb_fn_train)
if perturb_fn_test != identity:
perturbator_log['perturbated_test'] = get_perturbed_columns(perturb_fn_test)
first_train_log = assoc(first_train_log, "perturbator_log", perturbator_log)
return assoc(first_train_log, "validator_log", list(validator_logs))
def parallel_validator_iteration(train_data: pd.DataFrame,
fold: Tuple[int, Tuple[pd.Index, pd.Index]],
train_fn: LearnerFnType,
eval_fn: EvalFnType,
predict_oof: bool) -> LogType:
(fold_num, (train_index, test_indexes)) = fold
train_fn = cloudpickle.loads(train_fn)
eval_fn = cloudpickle.loads(eval_fn)
return validator_iteration(train_data, train_index, test_indexes, fold_num, train_fn, eval_fn, predict_oof)
[docs]@curry
def parallel_validator(train_data: pd.DataFrame,
split_fn: SplitterFnType,
train_fn: LearnerFnType,
eval_fn: EvalFnType,
n_jobs: int = 1,
predict_oof: bool = False) -> ValidatorReturnType:
"""
Splits the training data into folds given by the split function and
performs a train-evaluation sequence on each fold. Tries to run each
fold in parallel using up to n_jobs processes.
Parameters
----------
train_data : pandas.DataFrame
A Pandas' DataFrame with training data
split_fn : function pandas.DataFrame -> list of tuple
Partially defined split function that takes a dataset and returns
a list of folds. Each fold is a Tuple of arrays. The fist array in
each tuple contains training indexes while the second array
contains validation indexes.
train_fn : function pandas.DataFrame -> prediction_function, predictions_dataset, logs
A partially defined learning function that takes a training set and
returns a predict function, a dataset with training predictions and training
logs.
eval_fn : function pandas.DataFrame -> dict
A partially defined evaluation function that takes a dataset with prediction and
returns the evaluation logs.
n_jobs : int
Number of parallel processes to spawn.
predict_oof : bool
Whether to return out of fold predictions on the logs
Returns
----------
A list log-like dictionary evaluations.
"""
folds, logs = split_fn(train_data)
dumped_train_fn = cloudpickle.dumps(train_fn)
dumped_eval_fn = cloudpickle.dumps(eval_fn)
result = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(parallel_validator_iteration)(train_data, x, dumped_train_fn, dumped_eval_fn, predict_oof)
for x in enumerate(folds))
gc.collect()
train_log = {"train_log": [fold_result["train_log"] for fold_result in result]}
@curry
def kwdissoc(d: Dict, key: str) -> Dict:
return dissoc(d, key)
validator_logs = pipe(result,
partial(zip, logs),
map(lambda log_tuple: assoc(log_tuple[1], "split_log", log_tuple[0])),
map(kwdissoc(key="train_log")),
list)
return assoc(train_log, "validator_log", validator_logs)