Source code for fklearn.metrics.pd_extractors

import collections.abc
from datetime import datetime
from itertools import chain, repeat

import pandas as pd
from toolz import curry
from numpy import nan


[docs]@curry def evaluator_extractor(result, evaluator_name): metric_value = result[evaluator_name] if result else nan return pd.DataFrame({evaluator_name: [metric_value]})
[docs]@curry def combined_evaluator_extractor(result, base_extractors): return pd.concat([x(result) for x in base_extractors], axis=1)
[docs]@curry def split_evaluator_extractor_iteration(split_value, result, split_col, base_extractor, eval_name=None): if eval_name is None: eval_name = 'split_evaluator__' + split_col key = eval_name + '_' + str(split_value) return (base_extractor(result.get(key, {})) .assign(**{eval_name: split_value}))
[docs]@curry def split_evaluator_extractor(result, split_col, split_values, base_extractor, eval_name=None): return pd.concat( list(map(split_evaluator_extractor_iteration(result=result, split_col=split_col, base_extractor=base_extractor, eval_name=eval_name), split_values)))
[docs]@curry def temporal_split_evaluator_extractor(result, time_col, base_extractor, time_format="%Y-%m", eval_name=None): if eval_name is None: eval_name = 'split_evaluator__' + time_col split_keys = [key for key in result.keys() if eval_name in key] split_values = [] for key in split_keys: date = key.split(eval_name)[1][1:] try: # just check time format datetime.strptime(date, time_format) split_values.append(date) except ValueError: # this might happen if result has temporal splitters using different data formats pass return split_evaluator_extractor(result, time_col, split_values, base_extractor)
[docs]@curry def learning_curve_evaluator_extractor(result, base_extractor): return base_extractor(result).assign(lc_period_end=result['lc_period_end'])
[docs]@curry def reverse_learning_curve_evaluator_extractor(result, base_extractor): return base_extractor(result).assign(reverse_lc_period_start=result['reverse_lc_period_start'])
[docs]@curry def stability_curve_evaluator_extractor(result, base_extractor): return base_extractor(result).assign(sc_period=result['sc_period'])
[docs]@curry def repeat_split_log(split_log, results_len): if isinstance(split_log, collections.abc.Iterable): n_repeat = results_len // len(split_log) # The logic below makes [1, 2, 3] into [1, 1, 1, 2, 2, 2, 3, 3, 3] for n_repeat=3 return list(chain.from_iterable(zip(*repeat(split_log, n_repeat)))) else: return split_log
[docs]@curry def extract_base_iteration(result, extractor): extracted_results = pd.concat(list(map(extractor, result['eval_results']))) repeat_fn = repeat_split_log(results_len=len(extracted_results)) keys = result['split_log'].keys() assignments = {k: repeat_fn(result['split_log'][k]) for k in keys} return (extracted_results .assign(fold_num=result['fold_num']) .assign(**assignments))
[docs]@curry def extract(validator_results, extractor): return pd.concat(list(map(extract_base_iteration(extractor=extractor), validator_results)))
[docs]@curry def extract_lc(validator_results, extractor): return extract(validator_results, learning_curve_evaluator_extractor(base_extractor=extractor))
[docs]@curry def extract_reverse_lc(validator_results, extractor): return extract(validator_results, reverse_learning_curve_evaluator_extractor(base_extractor=extractor))
[docs]@curry def extract_sc(validator_results, extractor): return extract(validator_results, stability_curve_evaluator_extractor(base_extractor=extractor))
[docs]@curry def extract_param_tuning_iteration(iteration, tuning_log, base_extractor, model_learner_name): iter_df = base_extractor(tuning_log[iteration]["validator_log"]) return iter_df.assign(**tuning_log[iteration]["train_log"][model_learner_name]["parameters"])
[docs]@curry def extract_tuning(tuning_log, base_extractor, model_learner_name): iter_fn = extract_param_tuning_iteration(tuning_log=tuning_log, base_extractor=base_extractor, model_learner_name=model_learner_name) return pd.concat(list(map(iter_fn, range(len(tuning_log)))))
[docs]@curry def permutation_extractor(results, base_extractor): df = pd.concat(base_extractor(r) for r in results['permutation_importance'].values()) df.index = results['permutation_importance'].keys() if 'permutation_importance_baseline' in results: # With baseline comparison baseline = base_extractor(results['permutation_importance_baseline']) baseline.index = ["baseline"] df = pd.concat((df, baseline)) for c in baseline.columns: df[c + '_delta_from_baseline'] = baseline[c].iloc[0] - df[c] return df