Source code for clintk.utils.unfold

"""
unfolds merges dataframes into a big feature matrix 
All the features are labeled with a date and two keys for identification

Better explainations and schemas can be found on the repo wiki
"""
import numpy as np
import pandas as pd

from scipy.sparse import csr_matrix
from multiprocessing.pool import Pool
from sklearn.base import BaseEstimator


[docs]class Unfolder(BaseEstimator): """ Takes a dataframe[key1, key2, feature, value, date] to build a matrix of the parameters grouped by [key1, key2, date] This object is to be used after a timeframe of the feature has been build to group them into a feature matrix. The idea is to facilitate the data preparation for a sequential learning task. Parameters ---------- key1 : str primary key key2 : str secondary key feature : str name of the feature value : float value of the feature `feature` date : datetime date at which `feature` was measured group_date : bool, default=True set True to use date column as key to group data n_jobs : int number of CPUs to use for computation. If -1, all the available cores are used """ def __init__(self, key1, key2, feature, value, date, group_date=True, n_jobs=1): self.key1 = key1 self.key2 = key2 self.feature = feature self.value = value self.date = date self.group_date=group_date self.n_jobs = n_jobs self.df_ = None
[docs] def fit(self, df): """ saves dataframe for multiprocessing convenience Parameters ---------- df : pandas.DataFrame Returns ------- self """ self.df_ = df return self
[docs] def unfold(self): """ performs the unfolding transformation Returns ------- pandas.DataFrame The dataframe that contains the added feature columns Rows are ordered by [key1, key2, date] for convenience """ df_res = self.df_.loc[:, [self.key1, self.key2, self.date]] if self.n_jobs == -1: pool = Pool() else: pool = Pool(self.n_jobs) unique_features = self.df_[self.feature].unique() new_cols = pool.map(self._add_columns, unique_features) pool.close() pool.join() df_res.reset_index(drop=True, inplace=True) df_res = pd.concat([df_res] + [new_col for new_col in new_cols], axis=1) # aggregation function for group by agg_dic = {key: 'mean' for key in unique_features} if self.group_date: df_grouped = df_res.groupby(by=[self.key1, self.key2, self.date], sort=False, as_index=False).agg(agg_dic) else: df_res.drop(self.date, axis=1, inplace=True) df_grouped = df_res.groupby(by=[self.key1, self.key2], sort=False, as_index=False).agg(agg_dic) return df_grouped
def _add_columns(self, feature_name): """ adds a column of a given feature This auxiliary function is to ease the use of multiprocess.pool.Pool Parameters ---------- feature_name : str name of the feature we are adding to the dataframe Returns ------- pandas.DataFrame contains a single column `feature_name` that contains values or NaN depending on the presence of the feature for each row """ new_col = [] for i in self.df_.index: if self.df_.at[i, self.feature] == feature_name: new_col.append(self.df_.at[i, self.value]) else: new_col.append(np.nan) return pd.DataFrame(new_col, columns=[feature_name])
[docs]def transform_and_label(df, key1, key2, date, feature, value, estimator, return_estimator=False, **kwargs): """ Takes dataframe as input, applies transformation on value column and returns df with a new columns of the transformed feature The transformation returns a copy of the input dataframe Only implements unsupervised transformation @TODO keep sparse representation to unfold data Parameters ---------- df : pandas.DataFrame should contain only one unique value in its `feature` column feature : str features names column value : str features values column estimator : sklearn.BaseEstimator sklearn compatible transformer that implements .fit() and .transform() methods return_estimator : bool if true, returns the trained estimator **kwargs : additional keyword arguments for estimator object Returns ------- pandas.DataFrame same as df with additional rows for the transformed feature """ # unsupervised transformation old_col = df[value].values transformer = estimator(**kwargs).fit(old_col) new_col = transformer.transform(old_col) # converts to numpy.ndarray if type(new_col) in [pd.DataFrame, pd.Series]: new_col = new_col.values # elif type(new_col) == csr_matrix: # new_col = new_col.todense() else: new_col = new_col df_res = pd.DataFrame(None, columns=df.columns) # filling new rows if type(new_col) == csr_matrix: rows, cols = new_col.nonzero() for i, j in zip(rows, cols): row = {key1: df.at[i, key1], key2: df.at[i, key2], feature: df.at[i, feature] + '_{}'.format(j), value: new_col[i, j], date: df.at[i, date]} df_res = df_res.append(row, ignore_index=True) else: for i in range(new_col.shape[0]): for j in range(new_col.shape[1]): row = {key1: df.at[i, key1], key2: df.at[i, key2], feature: df.at[i, feature] + '_{}'.format(j), value: new_col[i, j], date: df.at[i, date]} df_res = df_res.append(row, ignore_index=True) if return_estimator: return df_res, transformer else: return df_res