Source code for clintk.utils.unfold
"""
unfolds merges dataframes into a big feature matrix 
All the features are labeled with a date and two keys for identification
Better explainations and schemas can be found on the repo wiki
"""
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from multiprocessing.pool import Pool
from sklearn.base import BaseEstimator
[docs]class Unfolder(BaseEstimator):
    """
    Takes a dataframe[key1, key2, feature, value, date] to build a matrix of
    the parameters grouped by [key1, key2, date]
    This object is to be used after a timeframe of the feature has been build to
    group them into a feature matrix.
    The idea is to facilitate the data preparation for a sequential learning
    task.
    Parameters
    ----------
    key1 : str
        primary key
    key2 : str
        secondary key
    feature : str
        name of the feature
    value : float
        value of the feature `feature`
    date : datetime
        date at which `feature` was measured
    group_date : bool, default=True
        set True to use date column as key to group data
    n_jobs : int
        number of CPUs to use for computation. If -1, all the available cores
        are used
    """
    def __init__(self, key1, key2, feature, value, date,
                 group_date=True, n_jobs=1):
        self.key1 = key1
        self.key2 = key2
        self.feature = feature
        self.value = value
        self.date = date
        self.group_date=group_date
        self.n_jobs = n_jobs
        self.df_ = None
[docs]    def fit(self, df):
        """  saves dataframe for multiprocessing convenience
        Parameters
        ----------
        df : pandas.DataFrame
        Returns
        -------
        self
        """
        self.df_ = df
        return self
[docs]    def unfold(self):
        """ performs the unfolding transformation
        Returns
        -------
        pandas.DataFrame
            The dataframe that contains the added feature columns
            Rows are ordered by [key1, key2, date] for convenience
        """
        df_res = self.df_.loc[:, [self.key1, self.key2, self.date]]
        if self.n_jobs == -1:
            pool = Pool()
        else:
            pool = Pool(self.n_jobs)
        unique_features = self.df_[self.feature].unique()
        new_cols = pool.map(self._add_columns, unique_features)
        pool.close()
        pool.join()
        df_res.reset_index(drop=True, inplace=True)
        df_res = pd.concat([df_res] + [new_col for new_col in new_cols],
                           axis=1)
        # aggregation function for group by
        agg_dic = {key: 'mean' for key in unique_features}
        if self.group_date:
            df_grouped = df_res.groupby(by=[self.key1, self.key2, self.date],
                                        sort=False,
                                        as_index=False).agg(agg_dic)
        else:
            df_res.drop(self.date, axis=1, inplace=True)
            df_grouped = df_res.groupby(by=[self.key1, self.key2],
                                        sort=False,
                                        as_index=False).agg(agg_dic)
        return df_grouped
    def _add_columns(self, feature_name):
        """ adds a column of a given feature
        This auxiliary function is to ease the use of multiprocess.pool.Pool
        Parameters
        ----------
        feature_name : str
            name of the feature we are adding to the dataframe
        Returns
        -------
        pandas.DataFrame
            contains a single column `feature_name` that contains values
            or NaN depending on the presence of the feature for each row
        """
        new_col = []
        for i in self.df_.index:
            if self.df_.at[i, self.feature] == feature_name:
                new_col.append(self.df_.at[i, self.value])
            else:
                new_col.append(np.nan)
        return pd.DataFrame(new_col, columns=[feature_name])
[docs]def transform_and_label(df, key1, key2, date,  feature, value,
                        estimator, return_estimator=False,
                        **kwargs):
    """ Takes dataframe as input, applies transformation on value column and
    returns  df with a new columns of the transformed feature
    The transformation returns a copy of the input dataframe
    Only implements unsupervised transformation
    @TODO keep sparse representation to unfold data
    Parameters
    ----------
    df : pandas.DataFrame
        should contain only one unique value in its `feature` column
    feature : str
        features names column
    value : str
        features values column
    estimator : sklearn.BaseEstimator
        sklearn compatible transformer that implements .fit() and
        .transform() methods
    return_estimator : bool
        if true, returns the trained estimator
    **kwargs : additional keyword arguments for estimator object
    Returns
    -------
    pandas.DataFrame
        same as df with additional rows  for the transformed feature
    """
    # unsupervised transformation
    old_col = df[value].values
    transformer = estimator(**kwargs).fit(old_col)
    new_col = transformer.transform(old_col)
    
    # converts to numpy.ndarray
    if type(new_col) in [pd.DataFrame, pd.Series]:
        new_col = new_col.values
    # elif type(new_col) == csr_matrix:
    #     new_col = new_col.todense()
    else:
        new_col = new_col
    df_res = pd.DataFrame(None, columns=df.columns)
    # filling new rows
    if type(new_col) == csr_matrix:
        rows, cols = new_col.nonzero()
        for i, j in zip(rows, cols):
            row = {key1: df.at[i, key1],
                   key2: df.at[i, key2],
                   feature: df.at[i, feature] + '_{}'.format(j),
                   value: new_col[i, j],
                   date: df.at[i, date]}
            df_res = df_res.append(row, ignore_index=True)
    else:
        for i in range(new_col.shape[0]):
            for j in range(new_col.shape[1]):
                row = {key1: df.at[i, key1],
                       key2: df.at[i, key2],
                       feature: df.at[i, feature] + '_{}'.format(j),
                       value: new_col[i, j],
                       date: df.at[i, date]}
                df_res = df_res.append(row, ignore_index=True)
    if return_estimator:
        return df_res, transformer
    else:
        return df_res