Source code for pd_utils.transform

import warnings
from typing import Union, List, Optional, Tuple

import numpy as np
import pandas as pd

from pd_utils.merge import groupby_merge
from pd_utils.utils import _to_list_if_str


[docs]def long_to_wide(df: pd.DataFrame, groupvars: Union[str, List[str]], values: Union[str, List[str]], colindex: Optional[Union[str, List[str]]] = None, colindex_only: bool = False): """ Takes a "long" format DataFrame and converts to a "wide" format :param df: :param groupvars: variables which signify unique observations in the output dataset :param values: variables which contain the values which need to be transposed :param colindex: columns containing extension for column name in the output dataset. If not specified, just uses the count of the row within the group. If a list is provided, each column value will be appended in order separated by _ :param colindex_only: If True, column names in output data will be only the colindex, and will not include the name of the values variable. Only valid when passing a single value, otherwise multiple columns would have the same name. :return: :Examples: For example:: If we had a long dataset of returns, with returns 12, 24, 36, 48, and 60 months after the date: ticker ret months AA .01 12 AA .15 24 AA .21 36 AA -.10 48 AA .22 60 and we want to get this to one observation per ticker: ticker ret12 ret24 ret36 ret48 ret60 AA .01 .15 .21 -.10 .22 We would use: long_to_wide(df, groupvars='ticker', values='ret', colindex='months') """ df = df.copy() # don't overwrite original # Check for duplicates if df.duplicated().any(): df.drop_duplicates(inplace=True) warnings.warn("Found duplicate rows and deleted.") # Ensure type of groupvars is correct if isinstance(groupvars, str): groupvars = [groupvars] assert isinstance(groupvars, list) # Ensure type of values is correct if isinstance(values, str): values = [values] assert isinstance(values, list) if colindex_only and len(values) > 1: raise NotImplementedError( "set colindex_only to False when passing more than one value" ) # Fixes for colindex # Use count of the row within the group for column index if not specified if colindex == None: df["__idx__"] = df.groupby(groupvars).cumcount() colindex = "__idx__" # If multiple columns are provided for colindex, combine and drop old cols if isinstance(colindex, list): df["__idx__"] = "" for col in colindex: df["__idx__"] = df["__idx__"] + "_" + df[col].astype(str) df.drop(col, axis=1, inplace=True) colindex = "__idx__" df["__key__"] = df[groupvars[0]].astype(str) # create key variable if len(groupvars) > 1: # if there are multiple groupvars, combine into one key for var in groupvars[1:]: df["__key__"] = df["__key__"] + "_" + df[var].astype(str) # Create seperate wide datasets for each value variable then merge them together for i, value in enumerate(values): if i == 0: combined = df.copy() # Create wide dataset raw_wide = df.pivot(index="__key__", columns=colindex, values=value) if not colindex_only: # add value name raw_wide.columns = [value + str(col) for col in raw_wide.columns] else: # remove _ from colindex name raw_wide.columns = [str(col).strip("_") for col in raw_wide.columns] wide = raw_wide.reset_index() # Merge back to original dataset combined = combined.merge(wide, how="left", on="__key__") return ( combined.drop([colindex, "__key__"] + values, axis=1) # type: ignore .drop_duplicates() .reset_index(drop=True) )
[docs]def averages(df: pd.DataFrame, avgvars: Union[str, List[str]], byvars: Union[str, List[str]], wtvar: Optional[str] = None, count: Union[str, bool] = False, flatten: bool = True): """ Returns equal- and value-weighted averages of variables within groups :param df: :param avgvars: variable names to take averages of :param byvars: variable names for by groups :param wtvar: variable to use for calculating weights in weighted average :param count: string of variable name, pass variable name to get count of non-missing of that variable within groups. :param flatten: False to return df with multi-level index :return: """ # Check types assert isinstance(df, pd.DataFrame) if isinstance(avgvars, str): avgvars = [avgvars] else: assert isinstance(avgvars, list) avgvars = avgvars.copy() # don't modify existing avgvars inplace assert isinstance(byvars, (str, list)) if wtvar != None: assert isinstance(wtvar, str) df = df.copy() if count and isinstance(count, str): df = groupby_merge(df, byvars, "count", subset=count) avgvars += [count + "_count"] g = df.groupby(byvars) avg_df = g.mean()[avgvars] if wtvar == None: if flatten: return avg_df.reset_index() else: return avg_df for var in avgvars: colname = var + "_wavg" df[colname] = df[wtvar] / g[wtvar].transform("sum") * df[var] wavg_cols = [col for col in df.columns if col[-4:] == "wavg"] g = df.groupby(byvars) # recreate because we not have _wavg cols in df wavg_df = g.sum()[wavg_cols] outdf = pd.concat([avg_df, wavg_df], axis=1) if flatten: return outdf.reset_index() else: return outdf
[docs]def winsorize(df: pd.DataFrame, pct: Union[float, Tuple[float, float]], subset: Optional[Union[str, List[str]]] = None, byvars: Optional[Union[str, List[str]]] =None, bot: bool = True, top: bool = True) -> pd.DataFrame: """ Finds observations above the pct percentile and replaces the with the pct percentile value. Does this for all columns, or the subset given by subset. :param df: :param pct: 0 < float < 1 or list of two values 0 < float < 1. If two values are given, the first will be used for the bottom percentile and the second will be used for the top. If one value is given and both bot and top are True, will use the same value for both. :param subset: column name(s) to winsorize :param byvars: Column names of columns identifying groups in the data. Winsorizing will be done within those groups. :param bot: True to winsorize bottom observations :param top: True to winsorize top observations :return: :Examples: >>> winsorize(df, .05, subset='RET') # replaces observations of RET below the 5% and above the 95% values >>> winsorize(df, (.05, .1), subset='RET') #replaces observations of RET below the 5% and above the 90% values """ # Check inputs assert any([bot, top]) # must winsorize something if isinstance(pct, float): bot_pct = pct top_pct = 1 - pct elif isinstance(pct, (list, tuple)): bot_pct = pct[0] top_pct = 1 - pct[1] else: raise ValueError("pct must be float or a tuple of two floats") def temp_winsor(col): return _winsorize(col, top_pct, bot_pct, top=top, bot=bot) # Save column order cols = df.columns # Get a dataframe of data to be winsorized, and a dataframe of the other columns to_winsor, rest = _select_numeric_or_subset(df, subset, extra_include=byvars) # Now winsorize if byvars: # use groupby to process groups individually to_winsor = groupby_merge( to_winsor, byvars, "transform", temp_winsor, replace=True ) else: # do entire df, one column at a time to_winsor = to_winsor.apply(temp_winsor, axis=0) return pd.concat([to_winsor, rest], axis=1)[cols]
def _winsorize(col, top_pct, bot_pct, top=True, bot=True): """ Winsorizes a pandas Series """ col = col.astype('float64', copy=False) if top: top_val = col.quantile(top_pct) col.loc[col > top_val] = top_val if bot: bot_val = col.quantile(bot_pct) col.loc[col < bot_val] = bot_val return col def _select_numeric_or_subset(df, subset, extra_include=None): """ If subset is not None, selects all numeric columns. Else selects subset. If extra_include is not None and subset is None, will select all numeric columns plus those in extra_include. Returns a tuple of (dataframe containing subset columns, dataframe of other columns) """ if subset == None: to_winsor = df.select_dtypes(include=[np.number, np.int64]).copy() subset = to_winsor.columns rest = df.select_dtypes(exclude=[np.number, np.int64]).copy() else: if isinstance(subset, str): subset = [subset] assert isinstance(subset, list) to_winsor = df[subset].copy() other_cols = [col for col in df.columns if col not in subset] rest = df[other_cols].copy() if extra_include: to_winsor = pd.concat([to_winsor, df[extra_include]], axis=1) rest.drop(extra_include, axis=1, inplace=True) return (to_winsor, rest)
[docs]def var_change_by_groups(df: pd.DataFrame, var: Union[str, List[str]], byvars: Union[str, List[str]], datevar: str = "Date", numlags: int = 1): """ Used for getting variable changes over time within bygroups. :Notes: Dataset is not sorted in this process. Sort the data in the order in which you wish lags to be created before running this command. :param df: dataframe containing bygroups, a date variable, and variables of interest :param var: column names of variables to get changes :param byvars: column names of variables identifying by groups :param datevar: column names of variables identifying periods :param numlags: number of periods to go back to get change :return: """ var, byvars, datevar = [ _to_list_if_str(v) for v in [var, byvars, datevar] ] # convert to lists assert isinstance(var, list) assert isinstance(byvars, list) assert isinstance(datevar, list) short_df = df.loc[ ~pd.isnull(df[byvars]).any(axis=1), var + byvars + datevar ].drop_duplicates() for v in var: short_df[v + "_lag"] = short_df.groupby(byvars)[v].shift(numlags) short_df[v + "_change"] = short_df[v] - short_df[v + "_lag"] dropvars = [v for v in var] + [v + "_lag" for v in var] short_df = short_df.drop(dropvars, axis=1) return df.merge(short_df, on=datevar + byvars, how="left")
[docs]def state_abbrev(df: pd.DataFrame, col: str, toabbrev: bool = False): """ Replaces a DataFrame's column of a state abbreviation or state name to the opposite :param df: :param col: name of column containing state names or state abbreviations :param toabbrev: True to convert state names to abbreviations, defaults to converting abbreviations to state names :return: """ df = df.copy() states_to_abbrev = { "Alabama": "AL", "Montana": "MT", "Alaska": "AK", "Nebraska": "NE", "Arizona": "AZ", "Nevada": "NV", "Arkansas": "AR", "New Hampshire": "NH", "California": "CA", "New Jersey": "NJ", "Colorado": "CO", "New Mexico": "NM", "Connecticut": "CT", "New York": "NY", "Delaware": "DE", "North Carolina": "NC", "Florida": "FL", "North Dakota": "ND", "Georgia": "GA", "Ohio": "OH", "Hawaii": "HI", "Oklahoma": "OK", "Idaho": "ID", "Oregon": "OR", "Illinois": "IL", "Pennsylvania": "PA", "Indiana": "IN", "Rhode Island": "RI", "Iowa": "IA", "South Carolina": "SC", "Kansas": "KS", "South Dakota": "SD", "Kentucky": "KY", "Tennessee": "TN", "Louisiana": "LA", "Texas": "TX", "Maine": "ME", "Utah": "UT", "Maryland": "MD", "Vermont": "VT", "Massachusetts": "MA", "Virginia": "VA", "Michigan": "MI", "Washington": "WA", "Minnesota": "MN", "West Virginia": "WV", "Mississippi": "MS", "Wisconsin": "WI", "Missouri": "MO", "Wyoming": "WY", } if toabbrev: df[col] = df[col].replace(states_to_abbrev) else: abbrev_to_states = dict((v, k) for k, v in states_to_abbrev.items()) df[col] = df[col].replace(abbrev_to_states) return df
def _join_col_strings(*args): strs = [str(arg) for arg in args] return "_".join(strs)
[docs]def join_col_strings(df: pd.DataFrame, cols: Union[str, List[str]]): """ Takes a dataframe and column name(s) and concatenates string versions of the columns with those names. Useful for when a group is identified by several variables and we need one key variable to describe a group. Returns a pandas Series. :param df: :param cols: names of columns in df to be concatenated :return: """ if isinstance(cols, str): cols = [cols] assert isinstance(cols, list) jc = np.vectorize(_join_col_strings) return pd.Series(jc(*[df[col] for col in cols]))