Source code for regtools.cluster

from typing import List, Optional
import itertools

import pandas as pd

from regtools.ext_statsmodels import update_statsmodel_result_with_new_cov_matrix

StrOrNone = Optional[str]
StrOrNoneList = List[StrOrNone]
StrOrNoneListList = List[StrOrNoneList]


[docs]def estimate_model_handling_cluster( regdf: pd.DataFrame, model, cluster: List[str], **fit_kwargs ): """ Handles multiway clustering through multiple estimations following Cameron, Gelbach, and Miller (2011). """ cluster_groups = _multiway_cluster_groups(cluster) if len(cluster_groups) == 0: raise ValueError( f"did not get any cluster groups, yet cluster was called with {cluster}" ) for i, cluster_group in enumerate(cluster_groups): result = _estimate_model_handling_single_cluster( regdf, model, cluster_group, **fit_kwargs ) cluster_group_cov_matrix = result.cov_params() if i == 0: cov_matrix = cluster_group_cov_matrix else: # Handle combining the covariance matrices across the different cluster estimations # Follow eq 2.13 in CGM (2011), where odd number of cluster groups are added # and even number of cluster groups are subtracted sign = _negative_one_if_even_positive_one_if_odd(len(cluster_group)) cov_matrix = cov_matrix + (sign * cluster_group_cov_matrix) # All parameter estimates should be identical, so can just override last result's cov matrix to # get final result update_statsmodel_result_with_new_cov_matrix(result, cov_matrix) return result
def _estimate_model_handling_single_cluster( regdf: pd.DataFrame, model, cluster: List[str], **fit_kwargs ): cluster_ids = _cluster_group_id_series(regdf, cluster) result = model.fit( cov_type="cluster", cov_kwds={"groups": cluster_ids}, **fit_kwargs ) return result def _multiway_cluster_groups(cluster_vars: List[str]) -> List[List[str]]: """ Transforms cluster_vars into the sets of cluster variables on which to run individual regressions, following Cameron, Gelbach, and Miller (2011). """ cluster_vectors = _cluster_vars_to_cluster_vector_lists(cluster_vars) all_cluster_groups = [] for group_tuple in itertools.product(*cluster_vectors): # group_tuple may come with Nones, such as ('Firm', None), or (None, None) # we only want to extract the non Nones valid_items = tuple([item for item in group_tuple if item is not None]) if len(valid_items) > 0: all_cluster_groups.append(valid_items) # Remove duplicates and convert tuples to lists all_cluster_groups_lists = [list(group) for group in set(all_cluster_groups)] return all_cluster_groups_lists def _cluster_vars_to_cluster_vector_lists(cluster_vars: List[str]) -> StrOrNoneListList: """ Transforms cluster_vars into a format which can be used with itertools.product. E.g. cluster_vars = ['Firm', 'Date'] -> [ ['Firm', None], [None, 'Date'] ] and cluster_vars = ['Firm', 'Date', 'Portfolio'] -> [ ['Firm', None, None], [None, 'Date', None], [None, None, 'Portfolio'] ] """ num_items = len(cluster_vars) all_lists: StrOrNoneListList = [] for i, cluster_var in enumerate(cluster_vars): output_list = [None] * num_items output_list[i] = cluster_var # type: ignore all_lists.append(output_list) # type: ignore return all_lists def _cluster_group_id_series(df: pd.DataFrame, cluster_vars: List[str]) -> pd.Series: unique_groups = df[cluster_vars].drop_duplicates() unique_groups["_group_id"] = range(0, len(unique_groups)) return df[cluster_vars].merge(unique_groups, how="left", on=cluster_vars)[ "_group_id" ] def _negative_one_if_even_positive_one_if_odd(num: int) -> int: if _is_even(num): return -1 else: return 1 def _is_even(num: int) -> bool: return num % 2 == 0