Source code for regtools.lag.create

from typing import Optional
import pandas as pd
from regtools.tools import _to_list_if_str
from pd_utils.filldata import add_missing_group_rows
from regtools.lag.remove import (
    lag_varname_to_varname_and_lag,
    VariableIsNotLaggedVariableException,
)


[docs]def create_lagged_variables( df, lag_cols, id_col: Optional[str] = None, date_col="Date", num_lags=1, fill_method="ffill", fill_limit: int = None, ): """ Note: partially inplace """ # Handle panel data versus not by whether id_col was passed lag_kwargs = dict(num_lags=num_lags) if id_col is not None: lag_kwargs.update(id_col=id_col) lag_func = _create_lagged_variable_panel df.sort_values([id_col, date_col], inplace=True) # Save original byvars, for outputting df of same shape orig_index_df = df[[id_col, date_col]] df = add_missing_group_rows( df, [id_col], [date_col], fill_method=fill_method, fill_limit=fill_limit ) else: lag_func = _create_lagged_variable # type: ignore for col in lag_cols: lag_func(df, col, **lag_kwargs) if id_col is not None: # Don't want to expand size of df df = orig_index_df.merge(df, how="left", on=[id_col, date_col]) return df
def _create_lagged_variable_panel(df, col, id_col="TICKER", num_lags=1): """ Note: inplace """ new_name = varname_to_lagged_varname(col, num_lags=num_lags) df[new_name] = df.groupby(id_col)[col].shift(num_lags) def _create_lagged_variable(df: pd.DataFrame, col: str, num_lags: int = 1) -> None: """ Note: inplace """ new_name = varname_to_lagged_varname(col, num_lags=num_lags) df[new_name] = df[col].shift(num_lags)
[docs]def varname_to_lagged_varname(varname: str, num_lags: int = 1) -> str: if num_lags == 0: # No lag string necessary return varname try: base_var, existing_lags = lag_varname_to_varname_and_lag(varname) except VariableIsNotLaggedVariableException: # Variable is not already lagged, so just add lag portion to str return _varname_to_lagged_varname(varname, num_lags) # Variable was lagged originally, need to add an additional number of lags and apply to base name total_lags = existing_lags + num_lags return _varname_to_lagged_varname(base_var, total_lags)
def _varname_to_lagged_varname(varname: str, num_lags: int = 1) -> str: return varname + f"$_{{t - {num_lags}}}$" def _convert_variable_names(yvar, xvars, lag_cols, num_lags=1): if yvar in lag_cols: yvar = varname_to_lagged_varname(yvar, num_lags=num_lags) out_xvars = [] for xvar in xvars: if xvar in lag_cols: out_xvars.append(varname_to_lagged_varname(xvar, num_lags=num_lags)) else: out_xvars.append(xvar) return yvar, out_xvars def _convert_interaction_tuples(interaction_tuples, lag_cols, num_lags=1): out_tuples = [] for tup in interaction_tuples: out_tuples.append( tuple( [ varname_to_lagged_varname(var, num_lags=num_lags) if (var in lag_cols) or (var + " Change" in lag_cols) else var for var in tup ] ) ) return out_tuples def _set_lag_variables(lag_variables, yvar, xvars): # Already passing a collection of columns, return if isinstance(lag_variables, (list, tuple)): return lag_variables assert isinstance(lag_variables, str) # Single str can either be a single column, 'all', or 'xvars' if lag_variables == "xvars": lag_variables = xvars.copy() elif lag_variables == "all": lag_variables = [yvar] + xvars else: # single column passed return _to_list_if_str(lag_variables) return lag_variables def _is_special_lag_keyword(lag_variables): if isinstance(lag_variables, (list, tuple)): return False # list of columns special_keywords = ("xvars", "all") return lag_variables in special_keywords