Source code for fklearn.training.pipeline

from inspect import Parameter, signature
from typing import Dict

import pandas as pd
import toolz as fp

from fklearn.types import LearnerFnType, LearnerReturnType, PredictFnType


[docs]def build_pipeline(*learners: LearnerFnType) -> LearnerFnType: """ Builds a pipeline of chained learners functions with the possibility of using keyword arguments in the predict functions of the pipeline. Say you have two learners, you create a pipeline with `pipeline = build_pipeline(learner1, learner2)`. Those learners must be functions with just one unfilled argument (the dataset itself). Then, you train the pipeline with `predict_fn, transformed_df, logs = pipeline(df)`, which will be like applying the learners in the following order: `learner2(learner1(df))`. Finally, you predict on different datasets with `pred_df = predict_fn(new_df)`, with optional kwargs. For example, if you have XGBoost or LightGBM, you can get SHAP values with `predict_fn(new_df, apply_shap=True)`. Parameters ---------- learners : partially-applied learner functions. Returns ---------- p : function pandas.DataFrame, **kwargs -> pandas.DataFrame A function that when applied to a DataFrame will apply all learner functions in sequence, with optional kwargs. new_df : pandas.DataFrame A DataFrame that is the result of applying all learner function in sequence. log : dict A log-like Dict that stores information of all learner functions. """ def _has_one_unfilled_arg(learner: LearnerFnType) -> None: no_default_list = [p for p, a in signature(learner).parameters.items() if a.default == '__no__default__'] assert len(no_default_list) <= 1, "Learner {0} has more than one unfilled argument: {1}\n" \ "Make sure all learners are curried properly and only require one argument," \ " which is the dataset (usually `df`).".format( learner.__name__, ', '.join(no_default_list) ) def _no_variable_args(learner: LearnerFnType, predict_fn: PredictFnType) -> None: invalid_parameter_kinds = (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD) var_args = [p for p, a in signature(predict_fn).parameters.items() if a.kind in invalid_parameter_kinds] assert len(var_args) == 0, "Predict function of learner {0} contains variable length arguments: {1}\n" \ "Make sure no predict function uses arguments like *args or **kwargs.".format( learner.__name__, ', '.join(var_args) ) # Check for unfilled arguments of learners for l in learners: _has_one_unfilled_arg(l) def pipeline(data: pd.DataFrame) -> LearnerReturnType: current_data = data.copy() features = list(data.columns) fns = [] logs = [] pipeline = [] serialisation = {} # type: dict for learner in learners: learner_fn, new_data, learner_log = learner(current_data) learner_name = learner.__name__ model_objects = {} if learner_log.get("obj"): model_objects["obj"] = learner_log.pop("obj") serialisation[learner_name] = {"fn": learner_fn, "log": learner_log, **model_objects} _no_variable_args(learner, learner_fn) # Check for invalid predict fn arguments fns.append(learner_fn) logs.append(learner_log) pipeline.append(learner_name) current_data = new_data merged_logs = fp.merge(logs) def predict_fn(df: pd.DataFrame, **kwargs: Dict) -> pd.DataFrame: # Get the proper arguments for each predict function (based on their signature) fns_args = [{k: v for k, v in kwargs.items() if k in signature(f).parameters} for f in fns] # Partially apply the arguments to the predict functions when applicable fns_with_args = [fp.curry(fn)(**args) if len(args) > 0 else fn for fn, args in zip(fns, fns_args)] return fp.pipe(df, *fns_with_args) merged_logs["__fkml__"] = {"pipeline": pipeline, "output_columns": list(current_data.columns), "features": features, "learners": {**serialisation}} return predict_fn, current_data, merged_logs return pipeline