import operator
from typing import Callable, Dict, List, Optional, Tuple
import pandas as pd
from tqdm import tqdm
from finstmt.check import item_series_is_empty
from finstmt.config_manage.data import DataConfigManager, _key_pct_of_key
from finstmt.config_manage.statement import StatementConfigManager
from finstmt.exc import (
CouldNotParseException,
MixedFrequencyException,
NoSuchItemException,
)
from finstmt.findata.period_data import PeriodFinancialData
from finstmt.forecast.config import ForecastConfig
from finstmt.forecast.main import Forecast
from finstmt.items.config import ItemConfig
from finstmt.logger import logger
[docs]class FinStatementsBase:
statements: Dict[pd.Timestamp, PeriodFinancialData]
statement_name: str = "Base"
items_config_list: List[ItemConfig]
[docs] def __init__(self, *args, **kwargs):
raise NotImplementedError
def __post_init__(self):
self.df = self.to_df()
# Hook up prior statements to statements
dates = list(self.statements.keys())
dates.sort()
prior_date = None
for i, date in enumerate(dates):
if i != 0:
self.statements[date].prior_statement = self.statements[prior_date]
prior_date = date
# Create dictionary of individual time period configs to construct the entire statement config
configs_dict = {}
for date, statement in self.statements.items():
configs_dict[date] = statement.config_manager
self.config = StatementConfigManager(configs_dict)
def _repr_html_(self):
return self._formatted_df._repr_html_()
# Get longitudenal series for a statement item
def __getattr__(self, item):
data_dict = {}
for (
date,
statement,
) in self.statements.items():
try:
statement_value = getattr(statement, item)
except AttributeError:
# Should hit here on the first loop if this is an invalid item
# Raise attribute error like normal.
raise AttributeError(item)
if pd.isnull(statement_value):
statement_value = 0
data_dict[date] = statement_value
item_config: Optional[ItemConfig] = None
try:
item_config = self.config.get(item)
except NoSuchItemException:
pass
return pd.Series(
data_dict, name=item_config.display_name if item_config else item
)
def __getitem__(self, item):
if not isinstance(item, (list, tuple)):
date_item = pd.to_datetime(item)
return self.statements[date_item]
# Got multiple dates
all_series = []
for date_str in item:
series = self.df[date_str]
date = pd.to_datetime(date_str)
series.name = date
all_series.append(series)
df = pd.concat(all_series, axis=1)
return self.from_df(df, disp_unextracted=False)
def __dir__(self):
normal_attrs = [
"statements",
"to_df",
"freq",
]
item_attrs = dir(list(self.statements.values())[0])
return normal_attrs + item_attrs
[docs] @classmethod
def from_df(
cls,
df: pd.DataFrame,
items_config_list: Optional[List[ItemConfig]] = None,
disp_unextracted: bool = True,
):
"""
DataFrame must have columns as dates and index as names of financial statement items
"""
statements_dict = {}
dates = list(df.columns)
dates.sort(key=lambda t: pd.to_datetime(t))
if items_config_list is None:
config_manager = DataConfigManager(cls.items_config_list.copy())
else:
config_manager = DataConfigManager(items_config_list.copy())
for col in dates:
try:
statement = PeriodFinancialData.from_series(
df[col], config_manager=config_manager
)
except CouldNotParseException:
raise CouldNotParseException(
"Passed DataFrame did not have any statement items in the index. "
"Did you set the column with statement items to the index? Got index:",
df.index,
)
statement_date = pd.to_datetime(col)
statements_dict[statement_date] = statement
if disp_unextracted:
# Warn about unextracted names
all_unextracted_names = set()
for stmt_data in statements_dict.values():
all_unextracted_names.update(stmt_data.unextracted_names)
if all_unextracted_names:
logger.info(
f"Was not able to extract data from the following names: {all_unextracted_names}"
)
return cls(statements_dict)
[docs] def to_df(self) -> pd.DataFrame:
all_series = []
for date, statement in self.statements.items():
series = statement.to_series()
series.name = date
all_series.append(series)
return pd.concat(all_series, axis=1)
@property
def _formatted_df(self) -> pd.DataFrame:
out_df = self.df.copy()
out_df.columns = [col.strftime("%m/%d/%Y") for col in out_df.columns]
return out_df.applymap(lambda x: f"${x:,.0f}" if not x == 0 else " - ")
def _forecast(
self, statements, **kwargs
) -> Tuple[Dict[str, Forecast], Dict[str, pd.Series]]:
if "freq" not in kwargs:
freq = self.freq
if freq is None:
raise MixedFrequencyException(
"Could not automatically determine frequency of history. Likely there are mixed "
"frequencies in the data. Either pass an explicit freq to forecast or remove the "
"periods which do not match the frequency before running the forecast."
)
kwargs[
"freq"
] = freq # use historical frequency if desired frequency not passed
forecast_config = ForecastConfig(**kwargs)
forecast_dict: Dict[str, Forecast] = {}
results: Dict[str, pd.Series] = {}
logger.info(f"Forecasting {self.statement_name}")
item: ItemConfig
for item in tqdm(self.config.items):
if not item.forecast_config.make_forecast:
# If user set to skip the forecast, skip it as well
# By default, all calculated items will be skipped
continue
data = getattr(statements, item.key)
pct_of_series = None
pct_of_config = None
if item.forecast_config.pct_of is not None:
pct_of_series = getattr(statements, item.forecast_config.pct_of)
pct_of_config = statements.config.get(item.forecast_config.pct_of)
forecast = Forecast(
data,
forecast_config,
item.forecast_config,
item,
pct_of_series=pct_of_series,
pct_of_config=pct_of_config,
)
forecast.fit()
forecast.predict()
forecast_dict[item.key] = forecast
if forecast.result is not None:
forecast.result.name = item.primary_name
if item.forecast_config.pct_of is not None:
key_pct_of_key = _key_pct_of_key(item.key, item.forecast_config.pct_of)
results[key_pct_of_key] = forecast.result
else:
results[item.key] = forecast.result
return forecast_dict, results
@property
def freq(self) -> str:
return pd.infer_freq(self.dates)
@property
def dates(self) -> List[pd.Timestamp]:
return list(self.statements.keys())
[docs] def item_is_empty(self, key: str) -> bool:
item: pd.Series = getattr(self, key)
return item_series_is_empty(item)
def __add__(self, other):
if isinstance(other, (float, int)):
new_df = self.df + other
elif isinstance(other, FinStatementsBase):
new_df = combine_statement_dfs(self.df, other.df, operation=operator.add)
else:
raise NotImplementedError(
f"cannot add type {type(other)} to type {type(self)}"
)
# TODO [#42]: combined statements retain only item config of first statements
#
# Think about the best way to handle this. This applies to all math dunder methods.
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
def __radd__(self, other):
return self.__add__(other)
def __mul__(self, other):
if isinstance(other, (float, int)):
new_df = self.df * other
elif isinstance(other, FinStatementsBase):
new_df = combine_statement_dfs(self.df, other.df, operation=operator.mul)
else:
raise NotImplementedError(
f"cannot multiply type {type(other)} to type {type(self)}"
)
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
def __rmul__(self, other):
return self.__mul__(other)
def __sub__(self, other):
if isinstance(other, (float, int)):
new_df = self.df - other
elif isinstance(other, FinStatementsBase):
new_df = combine_statement_dfs(self.df, other.df, operation=operator.sub)
else:
raise NotImplementedError(
f"cannot subtract type {type(other)} to type {type(self)}"
)
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
def __rsub__(self, other):
return (-1 * self) + other
def __truediv__(self, other):
if isinstance(other, (float, int)):
new_df = self.df / other
elif isinstance(other, FinStatementsBase):
new_df = combine_statement_dfs(
self.df, other.df, operation=operator.truediv
)
else:
raise NotImplementedError(
f"cannot divide type {type(other)} to type {type(self)}"
)
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
def __rtruediv__(self, other):
if isinstance(other, (float, int)):
new_df = other / self.df
else:
raise NotImplementedError(
f"cannot divide type {type(other)} to type {type(self)}"
)
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
def __round__(self, n=None) -> "FinStatementsBase":
new_df = round(self.df, n)
new_statements = type(self).from_df(
new_df, self.config.items, disp_unextracted=False
)
return new_statements
[docs]def combine_statement_dfs(
df: pd.DataFrame,
df2: pd.DataFrame,
operation: Callable[[pd.DataFrame, pd.DataFrame], pd.DataFrame] = operator.add,
) -> pd.DataFrame:
common_cols = [col for col in df.columns if col in df2.columns]
df_unique_cols = [col for col in df.columns if col not in df2.columns]
df2_unique_cols = [col for col in df2.columns if col not in df.columns]
common_df = operation(df[common_cols], df2[common_cols])
result = pd.concat([common_df, df[df_unique_cols], df2[df2_unique_cols]], axis=1)
cols = sorted(list(result.columns))
return result[cols]