import itertools
from collections import defaultdict
from copy import deepcopy
from typing import List, Dict, Any, Tuple, Sequence, Optional, Iterator
from pyfileconf.basemodels.config import ConfigBase
from pyfileconf.logger.logger import logger
from pyfileconf.plugin import manager
from pyfileconf.runner.models.interfaces import RunnerArgs, IterativeResults, IterativeResult
from pyfileconf.sectionpath.sectionpath import SectionPath
[docs]class IterativeRunner:
"""
Class that enables running functions/sections with different combinations of config updates,
even across multiple pipeline managers
Run one or multiple registered functions/sections multiple times, each time
updating the config with a combination of the passed config updates.
Aggregates the updates to each config, then takes the itertools product of all the config
updates to run the functions with each combination of the configs.
"""
run_items: List[SectionPath]
config_updates: Sequence[Dict[str, Any]]
cases: List[Tuple[Dict[str, Any], ...]]
last_case: Optional[Tuple[Dict[str, Any], ...]] = None
defaults: Dict[str, Dict[str, Any]]
[docs] def __init__(
self,
section_path_str_or_list: RunnerArgs,
config_updates: Sequence[Dict[str, Any]],
base_section_path_str: Optional[str] = None,
strip_manager_from_iv: bool = False,
):
"""
:param section_path_str_or_list: . separated name of path of function or section, or list thereof.
similar to how a function would be imported. e.g. 'main.data.summarize.summary_func1'
or when running multiple functions/sections, e.g. ['main.data', 'main.analysis.reg.1']
:param config_updates: list of kwarg dictionaries which would normally be provided to .update
:param base_section_path_str: section path str to put at beginning of all passed section paths
:param strip_manager_from_iv: whether to remove manager name from any incoming item views
"""
self.base_section_path_str = base_section_path_str
self.strip_manager_from_iv = strip_manager_from_iv
self.run_items = SectionPath.list_from_ambiguous(
section_path_str_or_list,
base_section_path_str=base_section_path_str,
strip_manager_from_iv=strip_manager_from_iv,
)
self.config_updates = config_updates
self.cases = self.get_cases()
self.defaults = self.get_defaults()
[docs] def get_cases(self) -> List[Tuple[Dict[str, Any], ...]]:
logger.debug('Determining cases for IterativeRunner')
cases_lol: List[
List[Tuple[Dict[str, Any], ...]]
] = manager.plm.hook.pyfileconf_iter_get_cases(config_updates=self.config_updates, runner=self)
cases = list(itertools.chain(*cases_lol))
manager.plm.hook.pyfileconf_iter_modify_cases(cases=cases, runner=self)
logger.debug(f'Got {cases} for IterativeRunner.cases')
return cases
[docs] def get_defaults(self) -> Dict[str, Dict[str, Any]]:
logger.debug('Determining defaults for IterativeRunner')
from pyfileconf import PipelineManager
if not hasattr(self, 'cases'):
raise ValueError('must set cases before calling get_defaults')
case = self.cases[0]
section_path_strs = [
self._get_full_section_path_str(conf['section_path_str']) for conf in case
]
defaults: Dict[str, Dict[str, Any]] = {}
for sp_str in section_path_strs:
pm = PipelineManager.get_manager_by_section_path_str(sp_str)
sp = SectionPath(sp_str)
relative_section_path_str = SectionPath(".".join(sp[1:])).path_str
config = pm.config.get(relative_section_path_str)
if config is not None:
defaults[sp_str] = {**config}
logger.debug(f'Got {defaults} for IterativeRunner.defaults')
return defaults
def _fill_case_with_defaults(self, case: Tuple[Dict[str, Any], ...]) -> Tuple[Dict[str, Any], ...]:
new_confs: List[Dict[str, Any]] = []
for conf in case:
sp_str = self._get_full_section_path_str(conf['section_path_str'])
defaults = self.defaults[sp_str]
new_conf = {**defaults, **conf}
new_confs.append(new_conf)
return tuple(new_confs)
def _get_full_section_path_str(self, section_path_str: str) -> str:
if not self.base_section_path_str:
return section_path_str
return SectionPath.join(self.base_section_path_str, section_path_str).path_str
[docs] def run(self, collect_results: bool = True) -> IterativeResults:
"""
:param collect_results: Whether to aggregate and return results, set to False to save memory
if results are stored in some other way
:return:
"""
from pyfileconf.main import PipelineManager
logger.info(f'Running {self.run_items} with cases')
all_results = []
for case in self.cases:
if not self.should_run(case):
continue
result = self._run_case(case)
if collect_results:
in_out_tup = (case, result)
all_results.append(in_out_tup)
logger.debug(f'Finished running {self.run_items} with cases {self.cases}')
return all_results
[docs] def run_gen(self) -> Iterator[IterativeResult]:
logger.info(f'Running {self.run_items} with cases')
for case in self.cases:
if not self.should_run(case):
continue
result = self._run_case(case)
in_out_tup = (case, result)
yield in_out_tup
def _run_case(self, case: Tuple[Dict[str, Any], ...]) -> Any:
case_with_defaults = self._fill_case_with_defaults(case)
manager.plm.hook.pyfileconf_iter_update_for_case(case=case_with_defaults, runner=self)
result = self._run()
self.last_case = case_with_defaults
return result
def _run(self) -> Any:
from pyfileconf.main import PipelineManager
results = []
for sp in self.run_items:
# Look up appropriate manager and run it
pm = PipelineManager.get_manager_by_section_path_str(sp.path_str)
relative_section_path_str = SectionPath(".".join(sp[1:])).path_str
result = pm.run(relative_section_path_str)
results.append(result)
if len(results) == 1:
return results[0]
return results
[docs] def should_run(self, case: Tuple[Dict[str, Any], ...]) -> bool:
"""
Hook which can be overriden in subclass to determine whether
case should be run
The default hook always returns True, the case should be run.
:param case: Tuple of config dicts being updated
:return:
"""
return True
[docs]def get_config_product(
config_dicts: Sequence[Dict[str, Any]]
) -> List[Tuple[Dict[str, Any], ...]]:
"""
:param config_dicts:
:return:
:Examples:
>>> cd = dict(
... section_path_str='abc',
... a=10
... )
>>> cd2 = dict(
... section_path_str='abc',
... a=20
... )
>>> cd3 = dict(
... section_path_str='def',
... a=20
... )
>>> cds = [cd, cd2, cd3]
>>> result = get_config_product(cds)
>>> result
... [({'section_path_str': 'abc', 'a': 10}, {'section_path_str': 'def', 'a': 20}),
... ({'section_path_str': 'abc', 'a': 20}, {'section_path_str': 'def', 'a': 20})]
"""
by_section_path_dict = _get_configs_by_section_path_dict(config_dicts)
return list(itertools.product(*by_section_path_dict.values()))
def _get_configs_by_section_path_dict(
config_dicts: Sequence[Dict[str, Any]]
) -> Dict[str, List[Dict[str, Any]]]:
"""
:param config_dicts:
:return:
:Examples:
>>> cd = dict(
... section_path_str='abc',
... a=10
... )
>>> cd2 = dict(
... section_path_str='abc',
... a=20
... )
>>> cd3 = dict(
... section_path_str='def',
... a=20
... )
>>> cds = [cd, cd2, cd3]
>>> result = _get_configs_by_section_path_dict(cds)
>>> result
... {'abc': [{'section_path_str': 'abc', 'a': 10},
... {'section_path_str': 'abc', 'a': 20}],
... 'def': [{'section_path_str': 'def', 'a': 20}]})
"""
combined_dicts: Dict[str, List[Dict[str, Any]]] = defaultdict(lambda: [])
for config_dict in config_dicts:
conf = deepcopy(config_dict)
sp = conf["section_path_str"]
combined_dicts[sp].append(conf)
return combined_dicts