Codice sorgente per aziona.cli.parser.v1.driver

import copy
from dataclasses import dataclass, field
from distutils.version import LooseVersion
from time import sleep
from typing import Dict, Union

from aziona.core import commands, io, text
from aziona.core.conf import errors, session, settings


[documenti]@dataclass class ParserOptionsStructure: interpolation: bool = True session_clean_before: bool = False session_clean_after: bool = False
[documenti]@dataclass class ParserTargetOptionsStructure: allow_failure_stage: list = field(default_factory=list) allow_failure_before: list = field(default_factory=list) allow_failure_after: list = field(default_factory=list)
[documenti]@dataclass class ParserRepeatStructure: count: int = 1 sleep: float = 0.0
[documenti]@dataclass class ParserStageStructure: module: str type: str args: Union[str, dict] = field(default_factory=dict) session: dict = field(default_factory=dict) before: dict = field(default_factory=dict) after: dict = field(default_factory=dict) repeat: ParserRepeatStructure = field(default_factory=ParserRepeatStructure)
[documenti]@dataclass class ParserTargetStructure: stages: Dict[str, ParserStageStructure] env: dict = field(default_factory=dict) before: dict = field(default_factory=dict) after: dict = field(default_factory=dict) repeat: ParserRepeatStructure = field(default_factory=ParserRepeatStructure) options: ParserTargetOptionsStructure = field( default_factory=ParserTargetOptionsStructure )
[documenti]class ParserEgine(object): targets: Dict[str, ParserTargetStructure] options: ParserOptionsStructure = field(default_factory=ParserOptionsStructure) env: dict = field(default_factory=dict) version: LooseVersion = LooseVersion("1") def __init__(self, raw: dict): self.options = self._process_options(data=raw.get("options", {})) self.env = self._make_interpolation(data=raw.get("env", {})) self._process_targets(data=raw.get("targets", {})) def __str__(self) -> str: return str(self.__dict__) def _process_options(self, data: dict): return ParserOptionsStructure(**data) def _make_interpolation(self, data, from_dict: dict = {}): if self.options.interpolation is True: return text.interpolation_vars(data, from_dict) return data def _make_args(self, data) -> str: if isinstance(data, (str, bool, int, float)): return str(data) if isinstance(data, (list, tuple)): return " ".join([str(item) for item in data]) # @TODO trovare medoto migliore if isinstance(data, dict): return " ".join( [ opt + (" " if opt not in ["--action-args", "--jq-query"] else "='") + (self._make_args(item) if item else "") + ("" if opt not in ["--action-args", "--jq-query"] else "'") for opt, item in data.items() ] ) return "" def _process_targets(self, data: dict): def _process_target_stages(data: dict): def _copy_from(stage_name: str, stage_data: str): try: cp_target_name, cp_stage_name = stage_data.split(".") except Exception as e: io.exception( e, "Valore non corretto dello stage '%s': %s.\nFormato corretto: nome_target.nome_stage " % (stage_name, stage_data), ) target = self.targets.get(cp_target_name, None) if target is None: raise errors.CriticalError( message="Target %s non trovato" % target_name ) if not target.stages.get(cp_stage_name, None): raise errors.CriticalError( message="Stage %s non trovato nel target %s" % (cp_stage_name, cp_target_name) ) stage = copy.deepcopy(target.stages.get(cp_stage_name)) return {stage_name: stage} def _make_from(stage_name: str, stage_data: dict): type = stage_data.get( "type", settings.getconst("PARSER_INTERPRETER").get("default") ) return { stage_name: ParserStageStructure( type=type, module=stage_data.get("module"), args=stage_data.get("args", ""), repeat=ParserRepeatStructure(**stage_data.get("repeat", {})), session=stage_data.get("session", []), before=stage_data.get("before", {}), after=stage_data.get("after", {}), ) } stages = {} for stage_name, stage_data in data.items(): if isinstance(stage_data, str): stages.update(_copy_from(stage_name, stage_data)) continue if isinstance(stage_data, dict): stages.update(_make_from(stage_name, stage_data)) continue raise errors.CriticalError(message="Stage %s non valido" % stage_name) return stages def _process_target_options(data: dict): return ParserTargetOptionsStructure(**data) def _process_target_repeat(data: dict): return ParserRepeatStructure(**data) def _process_target_env(data: dict): return {**self.env.copy(), **self._make_interpolation(data, self.env)} self.targets = {} for target_name, target_raw in data.items(): target_options = _process_target_options(target_raw.get("options", {})) target_repeat = _process_target_repeat(target_raw.get("repeat", {})) target_env = _process_target_env(target_raw.get("env", {})) target_stages = _process_target_stages(target_raw.get("stages")) target_before = self._make_interpolation( target_raw.get("before", {}), target_env ) target_after = self._make_interpolation( target_raw.get("after", {}), target_env ) self.targets[target_name] = ParserTargetStructure( stages=target_stages, options=target_options, env=target_env, before=target_before, after=target_after, repeat=target_repeat, ) def _run(self, command: str, allow_failure: bool, env: dict = {}): try: io.debug("Esecuzione comando: %s" % command) commands.exec(command, env=settings.environ(**env)) except Exception as e: message = "Options allow_failure: %s\nComando: %s\n%s" % ( ("ATTIVO" if allow_failure else "DISATTIVO"), command, str(e), ) if allow_failure is True: io.error(message) else: io.critical(message) def _exec_action(self, actions: dict, allow_failure, env: dict = {}): for act_name, act_value in actions.items(): io.step("action: '%s'" % act_name, 1) if isinstance(allow_failure, list): allow_failure = act_name in allow_failure self._run(command=act_value, allow_failure=allow_failure, env=env) def _exec_stage(self, target: ParserTargetStructure): if not isinstance(target, ParserTargetStructure): pass for stage_name, stage_value in target.stages.items(): for counter in range(stage_value.repeat.count): io.step("Stage '%s'" % stage_name) io.debug(f"Repeat counter: {counter+1}") target.env["REPEAT_COUNT_INDEX_STAGE"] = str(counter) # Caricamento sessione temporanea stage_session = {**target.env} for key in stage_value.session: stage_session.update(**session.get(key)[key]) # Interpolazione dei valori usando l'env costruito ad-hoc per lo stage: # ENV HOST -> ENV .aziona.yml -> ENV TARGET .aziona.yml -> SESSIONE stage_module = self._make_interpolation( stage_value.module, stage_session ) stage_args = self._make_args( self._make_interpolation(stage_value.args, stage_session) ) stage_command = settings.const.get_interpreter(stage_value.type).format( module=stage_module, args=stage_args ) stage_before = self._make_interpolation( stage_value.before, stage_session ) stage_after = self._make_interpolation(stage_value.after, stage_session) self._exec_action( actions=stage_before, allow_failure=target.options.allow_failure_before, env=stage_session, ) if isinstance(target.options.allow_failure_stage, list): allow_failure = stage_name in target.options.allow_failure_stage else: allow_failure = target.options.allow_failure_stage io.step("modulo: " + stage_value.module, 1) self._run( command=stage_command, allow_failure=allow_failure, env=stage_session, ) self._exec_action( actions=stage_after, allow_failure=target.options.allow_failure_after, env=stage_session, ) sleep(stage_value.repeat.sleep) def _clean_session(self, flag: bool): if flag is True: io.info("Session cleaned") session.clean()
[documenti] def main(self, *targets): self._clean_session(flag=self.options.session_clean_before) for name in targets: if name not in self.targets.keys(): io.warning("Target %s non trovato nel template" % name) continue target = self.targets.get(name) for counter in range(target.repeat.count): target.env["REPEAT_COUNT_INDEX_TARGET"] = str(counter) io.debug(f"Target {name} - repeat counter: {counter+1}") self._exec_action( actions=target.before, allow_failure=target.options.allow_failure_before, env=target.env, ) self._exec_stage(target) self._exec_action( actions=target.after, allow_failure=target.options.allow_failure_after, env=target.env, ) sleep(target.repeat.sleep) # default 0 self._clean_session(flag=self.options.session_clean_after)