Source code for pd_utils.cum

import datetime
import functools
import sys
import timeit
import warnings
from copy import deepcopy
from itertools import chain
from multiprocessing import Pool
from typing import Optional, List, Union, Sequence

import numpy as np
import pandas as pd

from pd_utils.timer import estimate_time
from pd_utils.utils import split, split_gen
from pd_utils.merge import groupby_merge


[docs]def cumulate( df: pd.DataFrame, cumvars: Union[str, List[str]], method: str, periodvar="Date", byvars: Optional[Union[str, List[str]]] = None, time: Optional[Sequence[int]] = None, grossify: bool = False, multiprocess: Union[bool, int] = True, replace: bool = False, ): """ Cumulates a variable over time. Typically used to get cumulative returns. :param df: :param cumvars: column names to cumulate :param method: 'between', 'zero', or 'first'. If 'zero', will give returns since the original date. Note: for periods before the original date, this will turn positive returns negative as we are going backwards in time. If 'between', will give returns since the prior requested time period. Note that the first period is period 0. If 'first', will give returns since the first requested time period. :param periodvar: :param byvars: column names to use to separate by groups :param time: for use with method='between'. Defines which periods to calculate between. :param grossify: set to True to add one to all variables then subtract one at the end :param multiprocess: set to True to use all available processors, set to False to use only one, pass an int less or equal to than number of processors to use that amount of processors :param replace: True to return df with passed columns replaced with cumulated columns. False to return df with both passed columns and cumulated columns :return: :Examples: For example:: For example, if our input data was for date 1/5/2006, but we had shifted dates: permno date RET shift_date 10516 1/5/2006 110% 1/5/2006 10516 1/5/2006 120% 1/6/2006 10516 1/5/2006 105% 1/7/2006 10516 1/5/2006 130% 1/8/2006 Then cumulate(df, 'RET', cumret='between', time=[1,3], get='RET', periodvar='shift_date') would return: permno date RET shift_date cumret 10516 1/5/2006 110% 1/5/2006 110% 10516 1/5/2006 120% 1/6/2006 120% 10516 1/5/2006 105% 1/7/2006 126% 10516 1/5/2006 130% 1/8/2006 130% Then cumulate(df, 'RET', cumret='first', periodvar='shift_date') would return: permno date RET shift_date cumret 10516 1/5/2006 110% 1/5/2006 110% 10516 1/5/2006 120% 1/6/2006 120% 10516 1/5/2006 105% 1/7/2006 126% 10516 1/5/2006 130% 1/8/2006 163.8% """ import time as time2 # accidentally used time an an input parameter and don't want to break prior code if method == 'zero': raise NotImplementedError('method zero not implemented yet') # TODO [#1]: get method 'zero' of cumulate working # # Has some WIP already commited, commented out def log(message): if message != "\n": time = datetime.datetime.now().replace(microsecond=0) message = str(time) + ": " + message sys.stdout.write(message + "\n") sys.stdout.flush() log("Initializing cumulate.") df = df.copy() # don't want to modify original dataframe sort_time: Optional[List[int]] if time: sort_time = sorted(time) else: sort_time = None if isinstance(cumvars, (str, int)): cumvars = [cumvars] assert isinstance(cumvars, list) assert isinstance(grossify, bool) if grossify: for col in cumvars: df[col] = df[col] + 1 # For method 'zero' implementation # def unflip(df, cumvars): # flipcols = ['cum_' + str(c) for c in cumvars] #select cumulated columns # for col in flipcols: # tempdf[col] = tempdf[col].shift(1) #shift all values down one row for cumvars # tempdf[col] = -tempdf[col] + 2 #converts a positive return into a negative return # tempdf = tempdf[1:].copy() #drop out period 0 # tempdf = tempdf.sort_values(periodvar) #resort to original order def flip(df, flip): flip_df = df[df["window"].isin(flip)] rest = df[~df["window"].isin(flip)] flip_df = flip_df.sort_values(byvars + [periodvar], ascending=False) return pd.concat([flip_df, rest], axis=0) def _cumulate(array_list, mp=multiprocess): if multiprocess: if isinstance(multiprocess, int): return _cumulate_mp(array_list, mp=mp) # use mp # processors else: return _cumulate_mp(array_list) # use all processors else: return _cumulate_sp(array_list) def _cumulate_sp(array_list): out_list = [] for array in array_list: out_list.append(np.cumprod(array, axis=0)) return np.concatenate(out_list, axis=0) def _cumulate_mp(array_list, mp=None): if mp: with Pool(mp) as pool: # use mp # processors return _cumulate_mp_main(array_list, pool) else: with Pool() as pool: # use all processors return _cumulate_mp_main(array_list, pool) def _cumulate_mp_main(array_gen, pool): # For time estimation counter = [] # num_loops = len(array_list) num_loops = 1000 # TEMP start_time = timeit.default_timer() # Mp setup cum = functools.partial(np.cumprod, axis=0) results = np.concatenate(list(pool.imap(cum, array_gen)), axis=0) # results = np.concatenate(list(pool.imap(cum, array_gen)), axis=0) return results results = [ pool.apply_async(cum, (arr,), callback=counter.append) for arr in array_gen ] # Time estimation # while len(counter) < num_loops: # estimate_time(num_loops, len(counter), start_time) # time2.sleep(0.5) # Collect and output results. A timeout of 1 should be fine because # it should wait until completion anyway return np.concatenate([r.get() for r in results], axis=0) #####TEMPORARY CODE###### assert method.lower() != "zero" ######################### if isinstance(byvars, str): byvars = [byvars] elif isinstance(byvars, list): byvars = deepcopy(byvars) # don't overwrite existing list assert method.lower() in ("zero", "between", "first") assert not ( (method.lower() == "between") and (time == None) ) # need time for between method if time != None and method.lower() != "between": warnings.warn("Time provided but method was not between. Time will be ignored.") # Creates a variable containing index of window in which the observation belongs if method.lower() == "between": df = _map_windows( df, sort_time, method=method, periodvar=periodvar, byvars=byvars ) else: df["__map_window__"] = 1 df.loc[df[periodvar] == min(df[periodvar]), "__map_window__"] = 0 ####################TEMP # import pdb # pdb.set_trace() ####################### if not byvars: byvars = ["__map_window__"] else: byvars.append("__map_window__") assert isinstance(byvars, list) # need to determine when to cumulate backwards # check if method is zero, there only negatives and zero, and there is at least one negative in each window if method.lower() == "zero": raise NotImplementedError("need to implement method zero") # flip is a list of indices of windows for which the window should be flipped # to_flip = [j for j, window in enumerate(windows) \ # if all([i <= 0 for i in window]) and any([i < 0 for i in window])] # df = flip(df, to_flip) log("Creating by groups.") # Create by groups df["__key_var__"] = "__key_var__" # container for key for col in [df[c].astype(str) for c in byvars]: df["__key_var__"] += col array_gen = split_gen(df, cumvars) # container_array = df[cumvars].values full_array = _cumulate(array_gen) new_cumvars = ["cum_" + str(c) for c in cumvars] cumdf = pd.DataFrame(full_array, columns=new_cumvars, dtype=np.float64) outdf = pd.concat([df.reset_index(drop=True), cumdf], axis=1) if method.lower == "zero" and flip != []: # if we flipped some of the dataframe pass # TEMPORARY if grossify: all_cumvars = cumvars + new_cumvars for col in all_cumvars: outdf[col] = outdf[col] - 1 if replace: outdf.drop(cumvars, axis=1, inplace=True) outdf.rename( columns={"cum_" + str(cumvar): cumvar for cumvar in cumvars}, inplace=True ) drop_cols = [col for col in outdf.columns if col.startswith("__")] return outdf.drop(drop_cols, axis=1)
def _map_windows( df, time, method="between", periodvar="Shift Date", byvars=["PERMNO", "Date"] ): """ Returns the dataframe with an additional column __map_window__ containing the index of the window in which the observation resides. For example, if the windows are [[1],[2,3]], and the periods are 1/1/2000, 1/2/2000, 1/3/2000 for PERMNO 10516 with byvar 'a', the df rows would be as follows: (10516, 'a', '1/1/2000', 0), (10516, 'a', '1/2/2000', 1), (10516, 'a', '1/3/2000', 1), """ df = df.copy() # don't overwrite original dataframe wm = functools.partial(window_mapping, time, method=method) if byvars: df = groupby_merge(df, byvars, "transform", (wm), subset=periodvar) else: df[periodvar + '_transform'] = wm(df[periodvar]) return df.rename(columns={periodvar + "_transform": "__map_window__"})
[docs]def create_windows(periods, time, method='between'): if method.lower() == 'first': windows = [[0]] windows += [[i for i in range(1, len(periods))]] return windows elif method.lower() == 'between': time = [t - time[0] for t in time] #shifts time so that first period is period 0 windows = [[0]] t_bot = 0 for i, t in enumerate(time): #pick each element of time if t == 0: continue #already added zero windows.append([i for i in range(t_bot + 1, t + 1)]) t_bot = t #The last window is all the leftover periods after finishing time extra_windows = [[i for i, per in enumerate(periods) if i not in chain.from_iterable(windows)]] if extra_windows != [[]]: #don't want to add empty window windows += extra_windows return windows
[docs]def window_mapping(time, col, method='between'): """ Takes a pandas series of dates as inputs, calculates windows, and returns a series of which windows each observation are in. To be used with groupby.transform() """ windows = create_windows(col, time, method=method) return [n for i in range(len(col.index)) for n, window in enumerate(windows) if i in window]