From 6d21edc72021d0d70ffc7f1d1f64041cc4a75ffa Mon Sep 17 00:00:00 2001 From: Carlo Antonio Venditti Date: Sat, 7 May 2022 15:23:19 +0200 Subject: [PATCH] Output fixes - Better handling of OutputAdapters inside pipeline.py - Added option to only save specific results at the end, could become the default in the future - Added some ouput tests - Do not print self for execute arguments in debug log on errors --- src/yapp/cli/__init__.py | 3 +- src/yapp/cli/logs.py | 2 + src/yapp/core/output_adapter.py | 46 ++++++++++- src/yapp/core/pipeline.py | 136 +++++++++++++++++++++++--------- tests/core/test_pipeline.py | 74 ++++++++++++++--- 5 files changed, 209 insertions(+), 52 deletions(-) diff --git a/src/yapp/cli/__init__.py b/src/yapp/cli/__init__.py index 03ef2dd..b96d9d3 100644 --- a/src/yapp/cli/__init__.py +++ b/src/yapp/cli/__init__.py @@ -87,14 +87,13 @@ def main(): config_parser.switch_workdir() pipeline() except Exception as error: # pylint: disable=broad-except - logging.exception(error) logging.debug("pipeline.inputs: %s", pipeline.inputs.__repr__()) logging.debug("pipeline.outputs: %s", pipeline.outputs) logging.debug("pipeline.job_list: %s", pipeline.job_list) for job in pipeline.job_list: args = inspect.getfullargspec(job.execute).args - logging.debug("%s.execute arguments: %s", job, args) + logging.debug("%s.execute arguments: %s", job, args[1:]) sys.exit(-1) diff --git a/src/yapp/cli/logs.py b/src/yapp/cli/logs.py index b7ad1c8..5b01f47 100644 --- a/src/yapp/cli/logs.py +++ b/src/yapp/cli/logs.py @@ -135,6 +135,8 @@ def format(self, record): if record.exc_info: msg = "> " + self.formatException(record.exc_info) + # This could be done using levelno instead of logging into the message but that way there + # would be no highlight in the file output which has no color output if msg.startswith("> "): head = self.get_color(record.levelno) + head # msg = self.get_color(logging.DEBUG) + msg[2:] + self.get_color() diff --git a/src/yapp/core/output_adapter.py b/src/yapp/core/output_adapter.py index 6684c7a..656e444 100644 --- a/src/yapp/core/output_adapter.py +++ b/src/yapp/core/output_adapter.py @@ -1,5 +1,6 @@ import logging from abc import ABC, abstractmethod +from typing import Any class OutputAdapter(ABC): @@ -9,12 +10,51 @@ class OutputAdapter(ABC): An output adapter represents a specific output destination """ + @property + def name(self): + """ + Helper to return the name of the class + """ + return self.__class__.__name__ + @abstractmethod def save(self, key, data): """ - Save data here + Save intermediate data here + Args: + key (str): + Key is the name used as key in the returned dict from the Job, or if it didn't return a + dictionary, the Job's name. + data (dict | Any): + Data returned from Job execution + """ + + def empty(self, job_name): + """ + Override this if you wish to save something when a Job returns nothing, + Leave as it is you prefer ignoring it. + + Args: + job_name (str): + Name of the job returning None + """ + pass + + def save_result(self, key, data): """ + Save final result here + Leave it as it is you just use save + """ + self.save(key, data) + + def _save(self, key, data): + if data is None: + logging.debug('Empty output to %s for job "%s"', self.name, key) + return self.empty(key) - def __setitem__(self, key, data): - logging.debug('Saving output to %s: "%s"', self.__class__.__name__, key) + logging.debug('Saving output to %s: "%s"', self.name, key) return self.save(key, data) + + def _save_result(self, key, data): + logging.debug('Saving final output to %s: "%s"', self.name, key) + return self.save_result(key, data) diff --git a/src/yapp/core/pipeline.py b/src/yapp/core/pipeline.py index d7ef91b..a4bb145 100644 --- a/src/yapp/core/pipeline.py +++ b/src/yapp/core/pipeline.py @@ -1,9 +1,11 @@ import inspect import logging from datetime import datetime +from typing import Sequence from .attr_dict import AttrDict from .inputs import Inputs +from .job import Job from .output_adapter import OutputAdapter @@ -33,7 +35,45 @@ class Pipeline: __nested_timed_calls = 0 - def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks): + + def __enforce_list(self, value): + if value is None: + return [] + if isinstance(value, set): + return list(value) + return value if isinstance(value, list) else [value] + + + def __init__( + self, + job_list: Sequence[type[Job]], + name: str = "", + inputs: Inputs | None = None, + outputs: Sequence[OutputAdapter] | OutputAdapter | None = None, + save_results: Sequence[str] | str | None = None, + **hooks, + ): + """__init__. + + Args: + job_list: + List of Jobs classes to run (in correct order) inside the pipeline + + name: + Pipeline name + + inputs: + Inputs for the pipeline + + outputs: + Onputs for the pipeline + + save_results: + Names of the outputs to save at the end of execution + + **hooks: + Hooks to attach to the pipeline + """ if name: self.name = name else: @@ -49,7 +89,8 @@ def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks): # inputs and outputs self.inputs = inputs if inputs else Inputs() - self.outputs = outputs if outputs else [] + self.outputs = self.__enforce_list(outputs) + self.save_results = self.__enforce_list(save_results) logging.debug("Inputs for %s: %s", self.name, repr(self.inputs)) # hooks @@ -72,20 +113,18 @@ def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks): @property def config(self): - """ Shortcut for configuration from inputs - """ + """Shortcut for configuration from inputs""" return self.inputs.config @property def job_name(self): - """ Shortcut for self.current_job.name which handles no current_job - """ + """Shortcut for self.current_job.name which handles no current_job""" if self.current_job: return self.current_job.name return None def run_hook(self, hook_name): - """ Run all hooks for current event + """Run all hooks for current event A hook is just a function taking a pipeline as single argument @@ -98,7 +137,7 @@ def run_hook(self, hook_name): self.timed(f"{hook_name} hook", hook.__name__, hook, self) def timed(self, typename, name, func, *args, **kwargs): - """ Runs a timed execution of a function, logging times + """Runs a timed execution of a function, logging times The first two parameters are used to specify the type and name of the entity to run. @@ -138,8 +177,7 @@ def timed(self, typename, name, func, *args, **kwargs): return out def _run_job(self, job): - """ Execution of a single job - """ + """Execution of a single job""" # Get arguments used in the execute function args = inspect.getfullargspec(job.execute).args[1:] @@ -156,34 +194,51 @@ def _run_job(self, job): logging.debug( "%s returned %s", job.name, - list(last_output.keys()) if last_output else last_output, + list(last_output.keys()) if isinstance(last_output,dict) else last_output, ) self.run_hook("on_job_finish") # save output and merge into inputs for next steps - if last_output: - logging.debug('saving last_output: %s len %s', type(last_output), - len(last_output)) - self.save_output(job.__class__.__name__, last_output) + if isinstance(last_output, dict): + logging.debug( + "saving last_output: %s len %s", + type(last_output), + len(last_output) if last_output is not None else "None", + ) + for key in last_output: + self.save_output(key, last_output) + else: + if last_output is None: + logging.warning("> %s returned None", job.name) + # save using job name + self.save_output(job.name, last_output) + # replace last_output with dict to merge into inputs + last_output = {job.name, last_output} + # merge into inputs + try: self.inputs.update(last_output) + except (TypeError, ValueError): + logging.warning("> Cannot merge output to inputs for job %s", job.name) + logging.info("Done saving %s outputs", job.name) - def save_output(self, name, data): - """ Save data to each output adapter + def save_output(self, name, data, results=False): + """Save data to each output adapter Args: - name: + name (str): name to pass to the output adapters when saving the data - data: + data (Any): data to save """ + + method = '_save' if not results else '_save_result' for output in self.outputs: - output[name] = data - logging.info("saved %s output to %s", name, output) + getattr(output, method)(name, data) + logging.debug("saved %s output to %s", name, output) def _run(self): - """ Runs all Pipeline's jobs - """ + """Runs all Pipeline's jobs""" self.run_hook("on_pipeline_start") for job_class in self.job_list: @@ -194,8 +249,18 @@ def _run(self): self.run_hook("on_pipeline_finish") - def __call__(self, inputs=None, outputs=None, config=None): - """ Pipeline entrypoint + # should this be done here or before the hook? + for output_name in self.save_results: + self.save_output(output_name, self.inputs[output_name], results=True) + + def __call__( + self, + inputs=None, + outputs=None, + config=None, + save_results: Sequence[str] | None = None, + ): + """Pipeline entrypoint Sets up inputs, outputs and config (if specified) and runs the pipeline """ @@ -203,29 +268,24 @@ def __call__(self, inputs=None, outputs=None, config=None): if inputs: self.inputs = inputs if outputs: - self.outputs = outputs - - # FIXME define one type for outputs and just enforce it - # Eventually creating an Outputs class - if isinstance(self.outputs, set): - self.outputs = list(self.outputs) + self.outputs = self.__enforce_list(outputs) + if save_results: + self.save_results = self.__enforce_list(save_results) if not isinstance(self.inputs, Inputs): - raise ValueError(f'{self.inputs} is not an Inputs object') - if not isinstance(self.outputs, list): - raise ValueError(f'{self.outputs} is not a list') + raise ValueError(f"{self.inputs} is not an Inputs object") - for i,output in enumerate(self.outputs): + for i, output in enumerate(self.outputs): if isinstance(output, type): self.outputs[i] = output = output() if not isinstance(output, OutputAdapter): - raise ValueError(f'{output} is not an OutputAdapter') + raise ValueError(f"{output} is not an OutputAdapter") # Check if something is missing if not self.inputs: - logging.warning("Missing inputs for pipeline %s", self.name) + logging.warning("> Missing inputs for pipeline %s", self.name) if not self.outputs: - logging.warning("Missing outputs for pipeline %s", self.name) + logging.warning("> Missing outputs for pipeline %s", self.name) if not config: config = {} diff --git a/tests/core/test_pipeline.py b/tests/core/test_pipeline.py index 91f1bb5..1027367 100644 --- a/tests/core/test_pipeline.py +++ b/tests/core/test_pipeline.py @@ -3,6 +3,7 @@ from yapp import Pipeline, Job from yapp.adapters.utils import DummyInput, DummyOutput from yapp.core.inputs import Inputs +from yapp.core.output_adapter import OutputAdapter class DummyJob(Job): @@ -12,7 +13,7 @@ def execute(self): class DummyJob2(Job): def execute(self, a_value): - return {"a_value": a_value * 2} + return {"another_value": a_value * 2} def hook_factory(expected_job, expected_job_name): @@ -34,24 +35,33 @@ def test_empty_pipeline(): pipeline = Pipeline([]) pipeline() - -def test_simple_pipeline(): - pipeline = Pipeline([DummyJob], name="test_pipeline") - +def pipeline_common_asserts(pipeline, *args, **kwargs): assert pipeline.inputs is not None assert pipeline.outputs is not None assert pipeline.current_job is None - pipeline() + pipeline(*args, **kwargs) assert pipeline.inputs is not None assert pipeline.outputs is not None assert pipeline.config is pipeline.inputs.config + return pipeline + + +def test_simple_pipeline(): + pipeline = Pipeline([DummyJob], name="test_pipeline") + pipeline = pipeline_common_asserts(pipeline) + assert "a_value" in pipeline.inputs + pipeline = Pipeline([DummyJob, DummyJob2], name="test_pipeline") + pipeline = pipeline_common_asserts(pipeline) + + assert "another_value" in pipeline.inputs + def test_types_outputs_pipeline(): pipeline = Pipeline([DummyJob], name="test_pipeline") @@ -65,9 +75,9 @@ def test_bad_inputs_pipeline(): pipeline = Pipeline([DummyJob], inputs=22, name="test_pipeline") pipeline() - with pytest.raises(ValueError): - pipeline = Pipeline([DummyJob], name="test_pipeline") - pipeline(outputs=DummyOutput) # not a list + # Doesn't raise error + pipeline = Pipeline([DummyJob], name="test_pipeline") + pipeline(outputs=DummyOutput) # not a list with pytest.raises(ValueError): pipeline = Pipeline([DummyJob], name="test_pipeline") @@ -99,3 +109,49 @@ def test_hooks_pipeline(): on_job_start=[*checker_hooks_dummy_job], ) pipeline(inputs=inputs, outputs=[DummyOutput]) + + + +class ReturnsNoneJob(Job): + def execute(self): + return None + + +class PrintEmptyOutput(OutputAdapter): + def save(self, key, data): + pass + + def empty(self, job_name): + print(f'empty {job_name} output', end='') + + +def test_empty_output_pipeline(capfd): + pipeline = Pipeline([ReturnsNoneJob], name="test_pipeline") + pipeline(outputs=[PrintEmptyOutput]) + + out, err = capfd.readouterr() + assert out == "empty ReturnsNoneJob output" + + +def test_ignore_empty_output_pipeline(capfd): + pipeline = Pipeline([ReturnsNoneJob], name="test_pipeline") + pipeline(outputs=[DummyOutput]) + + out, err = capfd.readouterr() + assert out == "" + + +class PrintFinalOutput(OutputAdapter): + def save(self, key, data): + pass + + def save_result(self, key, data): + print(key, '=', data, end='') + + +def test_save_results_pipeline(capfd): + pipeline = Pipeline([DummyJob], outputs=PrintFinalOutput) + pipeline(save_results='a_value') + + out, err = capfd.readouterr() + assert out == "a_value = -15"