from typing import List, Union
import statsmodels.api as sm
import pandas as pd
import numpy as np
from numpy import nan
from multiprocessing import Pool
from functools import partial
import time
from pd_utils.utils import split
[docs]def reg_by(df: pd.DataFrame, yvar: str, xvars: List[str], groupvar: Union[str, List[str]],
merge: bool = False, cons: bool = True, mp: Union[bool, int] = False, stderr: bool = False):
"""
Runs a regression of df[yvar] on df[xvars] by values of groupvar. Outputs a dataframe with values of
groupvar and corresponding coefficients, unless merge=True, then outputs the original dataframe with the
appropriate coefficients merged in.
:param df:
:param yvar:
:param xvars:
:param groupvar: column names of columns identifying by groups
:param merge:
:param cons: True to include a constant, False to not
:param mp: False to use single processor, True to use all processors, int to use # processors
:param stderr: True to include standard errors of coefficients
:return:
"""
# TODO [#9]: fill out param docs for reg_by
# Convert xvars to list if str is passed
xvars = _check_inputs_regby(xvars)
# If there are multiple groupvars, create a key variable containing all groupvars (modifies df inplace)
groupvar, drop_group = _set_groupvar_and_drop_group(df, groupvar)
# Select dataframe of only y, x and group vars, drop any missings
yx_df = df_for_reg(df, yvar, xvars, groupvar)
# Create a list of right hand side variables. Includes 'const' if cons is True
rhs = _set_rhs(xvars, cons)
# Split DataFrame into a list of arrays with each bygroup being one array. Provide an accompanying list of bygroups
arrs, groups = _get_lists_of_arrays_and_groups(yx_df, yvar, xvars, groupvar)
# Run regressions by groups, storing results as a list of numpy arrays
results = _reg_by(arrs, groups, xvars, rhs, cons, mp=mp, stderr=stderr)
# Combine list of arrays into df, and apply column labels
result_df = _result_list_of_arrays_to_df(results, rhs, groupvar, stderr=stderr)
if cons:
result_df.rename(columns={"coef_const": "const"}, inplace=True)
if merge:
result_df = df.merge(result_df, how="left", on=groupvar)
if drop_group:
result_df.drop(groupvar, axis=1, inplace=True)
return result_df.reset_index(drop=True)
[docs]def df_for_reg(df, yvar, xvars, groupvar):
# Select dataframe of only y and x vars
yx_df = df.loc[:, xvars + [yvar]]
# Recombine groupvar and drop missing
yx_df = pd.concat([yx_df, df[groupvar]], axis=1).dropna()
# Now drop groups which have too few observations
# group_counts = yx_df.groupby(groupvar)[yvar].count()
# valid_groups = pd.Series(group_counts[group_counts > len(xvars) + 1].index).tolist()
# yx_df = yx_df[yx_df[groupvar].isin(valid_groups)]
return yx_df
def _reg_by(arrs, groups, xvars, rhs, cons, mp=False, stderr=False):
if mp:
return _reg_by_mp(arrs, groups, xvars, rhs, cons, stderr=stderr)
else:
return _reg_by_sp(arrs, groups, xvars, rhs, cons, stderr=stderr)
def _reg_by_sp(arrs, groups, xvars, rhs, cons, stderr=False):
results = []
for i, arr in enumerate(arrs):
results.append(_reg(arr, xvars, rhs, cons, groups[i], stderr))
return results
def _reg_by_mp(arrs, groups, xvars, rhs, cons, mp=True, stderr=False):
if isinstance(mp, int):
num_processes = mp
else:
num_processes = None
# result_counter = ResultCounter(len(arrs))
# _log_with_result_counter = partial(
# _log_mp_status,
# result_counter=result_counter
# )
num_expected_results = len(arrs)
num_completed = 0
with Pool(num_processes) as pool:
expected_results = []
for i, arr in enumerate(arrs):
expected_results.append(
pool.apply_async(
_reg,
(arr, xvars, rhs, cons, groups[i], stderr)
# callback=_log_with_result_counter
)
)
while num_completed != num_expected_results:
time.sleep(1)
num_completed = sum([1 for result in expected_results if result.ready()])
print(
f"Finished {num_completed} out of {num_expected_results} calculations",
end="\r",
)
completed_results = [result.get() for result in expected_results]
return completed_results
def _log_mp_status(result, result_counter):
result_counter += 1
print(result_counter, end="\r")
[docs]class ResultCounter:
[docs] def __init__(self, num_expected_results):
self.count = 0
self.expected_results = num_expected_results
def __add__(self, other):
self.count += other
def __repr__(self):
return rf"<ResultCounter(count={self.count}, expected={self.expected_results})>"
def __str__(self):
return f"Finished {self.count} out of {self.expected_results} calculations"
def _reg(arr, xvars, rhs, cons, group, stderr):
X = arr[:, 1:].astype(float)
min_obs = len(xvars) + 1
if cons:
X = sm.add_constant(X)
min_obs += 1
y = arr[:, 0].astype(float)
if arr.shape[0] >= min_obs: # if enough observations, run regression
model = sm.OLS(y, X)
result = model.fit()
this_result = np.append(result.params, group) # add groupvar
if stderr:
this_result = np.append(this_result, result.HC1_se) # robust stderr
this_result = this_result[None, :] # cast 1d array into 2d array
else: # not enough obs, return nans
this_result = np.empty((1, len(rhs) + 1), dtype="O")
this_result[:] = nan
this_result[0, len(rhs)] = group
return this_result
def _check_inputs_regby(xvars):
if isinstance(xvars, str):
xvars = [xvars]
assert isinstance(xvars, list)
return xvars
def _set_groupvar_and_drop_group(df, groupvar):
drop_group = False
if isinstance(groupvar, list):
df["__key_regby__"] = ""
for var in groupvar:
df["__key_regby__"] = df["__key_regby__"] + df[var].astype(str)
groupvar = "__key_regby__"
drop_group = True
return groupvar, drop_group
def _set_rhs(xvars, cons):
if cons:
rhs = ["const"] + xvars
else:
rhs = xvars
return rhs
def _result_list_of_arrays_to_df(results, rhs, groupvar, stderr=False):
result_df = pd.DataFrame(np.concatenate(results, axis=0))
result_df = result_df.apply(pd.to_numeric, errors="ignore")
result_df.columns = _set_cols_by_stderr(rhs, groupvar, stderr)
result_df[groupvar] = result_df[groupvar].astype(str)
return result_df
def _get_lists_of_arrays_and_groups(df, yvar, xvars, groupvar):
df.sort_values(groupvar, inplace=True)
arrs = split(df, [yvar] + xvars, keyvar=groupvar)
groups = df[groupvar].unique().tolist()
assert len(arrs) == len(groups)
return arrs, groups
def _set_cols_by_stderr(rhs, groupvar, stderr):
coef_cols = ["coef_" + col for col in rhs]
if not stderr:
cols = coef_cols + [groupvar]
else:
stderr_cols = ["stderr_" + col for col in rhs]
cols = coef_cols + [groupvar] + stderr_cols
return cols