Source code for finstmt.resolver.solve

from typing import Dict, List, Sequence, Tuple, Union

import numpy as np
import pandas as pd
from sympy import Eq, Expr, Indexed, IndexedBase, Symbol, expand, sympify
from sympy.logic.boolalg import BooleanFalse, BooleanTrue

from finstmt.items.config import ItemConfig

PLUG_SCALE = 1e11


[docs]def sympy_dict_to_results_dict( s_dict: Dict[IndexedBase, float], forecast_dates: pd.DatetimeIndex, item_configs: List[ItemConfig], t_offset: int = 0, ) -> Dict[str, pd.Series]: item_config_dict: Dict[str, ItemConfig] = { config.key: config for config in item_configs } new_results = {} for expr in s_dict: key = str(expr.base) # type: ignore[attr-defined] try: config = item_config_dict[key] except KeyError: # Must be pct of item, don't need in final results continue new_results[key] = pd.Series( index=forecast_dates, dtype="float", name=config.primary_name ) for expr, val in s_dict.items(): key = str(expr.base) # type: ignore[attr-defined] t = int(expr.indices[0]) - t_offset # type: ignore[attr-defined] if t < 0: # Don't need to store historical results continue if key not in new_results: # Pct of item, skip it, don't need in final results continue new_results[key].iloc[t] = float(val) return new_results
[docs]def results_dict_to_sympy_dict( results_dict: Dict[str, pd.Series], sympy_namespace: Dict[str, Expr] ) -> Dict[IndexedBase, float]: out_dict = {} for key, series in results_dict.items(): arr = series.values for i, val in enumerate(arr): t_str = f"{key}[{i + 1}]" lhs = sympify(t_str, locals=sympy_namespace) out_dict[lhs] = val return out_dict
[docs]def get_solve_eqs_and_full_subs_dict( eqs_for_sub: List[Eq], subs_dict: Dict[IndexedBase, float] ) -> Tuple[List[Eq], Dict[IndexedBase, float]]: subbed_eqs = [] subs_dict = subs_dict.copy() finished = False while not finished: next_eqs_to_sub = [] for eq in eqs_for_sub: subbed = Eq(eq.lhs, eq.rhs.xreplace(subs_dict)) if isinstance(subbed, (BooleanFalse, BooleanTrue)): # Both lhs and rhs was subbed. This means it is a calculated item which was # added to the forecast because it is being used as pct_of in other items # Simply don't solve the equation again as the result is already in the subs dict continue elif subbed.rhs.free_symbols == set(): # Equation is completely solved subbed_eqs.append(subbed) subs_dict[subbed.lhs] = subbed.rhs # use it in other solutions else: # Equation still has remaining symbols, handle on next loop next_eqs_to_sub.append(subbed) if not next_eqs_to_sub: finished = True if eqs_for_sub == next_eqs_to_sub: # Went through a loop without any equations changed. # Not going to make any more progress, so exit without complete substitution finished = True eqs_for_sub = next_eqs_to_sub.copy() return eqs_for_sub, subs_dict
[docs]def solve_equations( solve_eqs: List[Eq], subs_dict: Dict[IndexedBase, float], substitute: bool = True, round_results: bool = True, ): solutions_dict = subs_dict.copy() if substitute: solve_eqs, solutions_dict = get_solve_eqs_and_full_subs_dict( solve_eqs, solutions_dict ) solve_exprs = [] to_solve_for = [] for eq in solve_eqs: solve_exprs.append(eq.rhs - eq.lhs) to_solve_for.append(eq.lhs) to_solve_for = list(set(to_solve_for)) res_set = numpy_solve(solve_exprs, to_solve_for) if not res_set: raise ValueError("could not solve equations") solutions_dict.update(res_set) return solutions_dict
def _solve_eqs_with_plug_solutions( eqs: List[Eq], plug_solutions: Dict[IndexedBase, float], subs_dict: Dict[IndexedBase, float], forecast_dates: pd.DatetimeIndex, item_configs: List[ItemConfig], ) -> Dict[IndexedBase, float]: subs_dict = subs_dict.copy() subs_dict.update(plug_solutions) sub_eqs = [Eq(lhs, rhs) for lhs, rhs in subs_dict.items()] solutions_dict = solve_equations(eqs + sub_eqs, {}, substitute=False) return solutions_dict def _x_arr_to_plug_solutions( x: np.ndarray, plug_keys: Sequence[str], sympy_namespace: Dict[str, IndexedBase] ) -> Dict[IndexedBase, float]: x_arrs = np.split(x * PLUG_SCALE, len(plug_keys)) plug_dict = {key: pd.Series(x_arrs[i]) for i, key in enumerate(plug_keys)} # TODO: Is Expr or IndexedBase the correct type? plug_solutions = results_dict_to_sympy_dict(plug_dict, sympy_namespace) # type: ignore[arg-type] return plug_solutions def _symbolic_to_matrix(exprs: Sequence[Expr], variables: Sequence[Symbol]): """ Expr should be in the format of eq.lhs - eq.rhs Assuming that there exists numeric matrix A such that equation F = 0 is equivalent to linear equation Ax = b, this function returns tuple (A, b) """ A = [] b = [] for expr in exprs: coeffs = expand(expr).as_coefficients_dict() A.append([float(coeffs[x]) for x in variables]) b.append(-float(coeffs[1])) return np.array(A), np.array(b)
[docs]def numpy_solve(exprs: Sequence[Expr], variables: Sequence[Symbol]): a_arr, b_arr = _symbolic_to_matrix(exprs, variables) x = np.linalg.solve(a_arr, b_arr) solution_dict = {var: x[i] for i, var in enumerate(variables)} return solution_dict
def _get_indexed_symbols(expr: Union[Eq, Expr]) -> List[Indexed]: return [sym for sym in expr.free_symbols if isinstance(sym, Indexed)]