Skip to content

Commit

Permalink
Output fixes
Browse files Browse the repository at this point in the history
- Better handling of OutputAdapters inside pipeline.py
- Added option to only save specific results at the end, could become
  the default in the future
- Added some ouput tests
- Do not print self for execute arguments in debug log on errors
  • Loading branch information
Carlo Antonio Venditti committed May 7, 2022
1 parent 8d6e78d commit 6d21edc
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 52 deletions.
3 changes: 1 addition & 2 deletions src/yapp/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,13 @@ def main():
config_parser.switch_workdir()
pipeline()
except Exception as error: # pylint: disable=broad-except

logging.exception(error)
logging.debug("pipeline.inputs: %s", pipeline.inputs.__repr__())
logging.debug("pipeline.outputs: %s", pipeline.outputs)
logging.debug("pipeline.job_list: %s", pipeline.job_list)
for job in pipeline.job_list:
args = inspect.getfullargspec(job.execute).args
logging.debug("%s.execute arguments: %s", job, args)
logging.debug("%s.execute arguments: %s", job, args[1:])
sys.exit(-1)


Expand Down
2 changes: 2 additions & 0 deletions src/yapp/cli/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def format(self, record):
if record.exc_info:
msg = "> " + self.formatException(record.exc_info)

# This could be done using levelno instead of logging into the message but that way there
# would be no highlight in the file output which has no color output
if msg.startswith("> "):
head = self.get_color(record.levelno) + head
# msg = self.get_color(logging.DEBUG) + msg[2:] + self.get_color()
Expand Down
46 changes: 43 additions & 3 deletions src/yapp/core/output_adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from abc import ABC, abstractmethod
from typing import Any


class OutputAdapter(ABC):
Expand All @@ -9,12 +10,51 @@ class OutputAdapter(ABC):
An output adapter represents a specific output destination
"""

@property
def name(self):
"""
Helper to return the name of the class
"""
return self.__class__.__name__

@abstractmethod
def save(self, key, data):
"""
Save data here
Save intermediate data here
Args:
key (str):
Key is the name used as key in the returned dict from the Job, or if it didn't return a
dictionary, the Job's name.
data (dict | Any):
Data returned from Job execution
"""

def empty(self, job_name):
"""
Override this if you wish to save something when a Job returns nothing,
Leave as it is you prefer ignoring it.
Args:
job_name (str):
Name of the job returning None
"""
pass

def save_result(self, key, data):
"""
Save final result here
Leave it as it is you just use save
"""
self.save(key, data)

def _save(self, key, data):
if data is None:
logging.debug('Empty output to %s for job "%s"', self.name, key)
return self.empty(key)

def __setitem__(self, key, data):
logging.debug('Saving output to %s: "%s"', self.__class__.__name__, key)
logging.debug('Saving output to %s: "%s"', self.name, key)
return self.save(key, data)

def _save_result(self, key, data):
logging.debug('Saving final output to %s: "%s"', self.name, key)
return self.save_result(key, data)
136 changes: 98 additions & 38 deletions src/yapp/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
import logging
from datetime import datetime
from typing import Sequence

from .attr_dict import AttrDict
from .inputs import Inputs
from .job import Job
from .output_adapter import OutputAdapter


Expand Down Expand Up @@ -33,7 +35,45 @@ class Pipeline:

__nested_timed_calls = 0

def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks):

def __enforce_list(self, value):
if value is None:
return []
if isinstance(value, set):
return list(value)
return value if isinstance(value, list) else [value]


def __init__(
self,
job_list: Sequence[type[Job]],
name: str = "",
inputs: Inputs | None = None,
outputs: Sequence[OutputAdapter] | OutputAdapter | None = None,
save_results: Sequence[str] | str | None = None,
**hooks,
):
"""__init__.
Args:
job_list:
List of Jobs classes to run (in correct order) inside the pipeline
name:
Pipeline name
inputs:
Inputs for the pipeline
outputs:
Onputs for the pipeline
save_results:
Names of the outputs to save at the end of execution
**hooks:
Hooks to attach to the pipeline
"""
if name:
self.name = name
else:
Expand All @@ -49,7 +89,8 @@ def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks):

# inputs and outputs
self.inputs = inputs if inputs else Inputs()
self.outputs = outputs if outputs else []
self.outputs = self.__enforce_list(outputs)
self.save_results = self.__enforce_list(save_results)
logging.debug("Inputs for %s: %s", self.name, repr(self.inputs))

# hooks
Expand All @@ -72,20 +113,18 @@ def __init__(self, job_list, name="", inputs=None, outputs=None, **hooks):

@property
def config(self):
""" Shortcut for configuration from inputs
"""
"""Shortcut for configuration from inputs"""
return self.inputs.config

@property
def job_name(self):
""" Shortcut for self.current_job.name which handles no current_job
"""
"""Shortcut for self.current_job.name which handles no current_job"""
if self.current_job:
return self.current_job.name
return None

def run_hook(self, hook_name):
""" Run all hooks for current event
"""Run all hooks for current event
A hook is just a function taking a pipeline as single argument
Expand All @@ -98,7 +137,7 @@ def run_hook(self, hook_name):
self.timed(f"{hook_name} hook", hook.__name__, hook, self)

def timed(self, typename, name, func, *args, **kwargs):
""" Runs a timed execution of a function, logging times
"""Runs a timed execution of a function, logging times
The first two parameters are used to specify the type and name of the entity to run.
Expand Down Expand Up @@ -138,8 +177,7 @@ def timed(self, typename, name, func, *args, **kwargs):
return out

def _run_job(self, job):
""" Execution of a single job
"""
"""Execution of a single job"""

# Get arguments used in the execute function
args = inspect.getfullargspec(job.execute).args[1:]
Expand All @@ -156,34 +194,51 @@ def _run_job(self, job):
logging.debug(
"%s returned %s",
job.name,
list(last_output.keys()) if last_output else last_output,
list(last_output.keys()) if isinstance(last_output,dict) else last_output,
)

self.run_hook("on_job_finish")

# save output and merge into inputs for next steps
if last_output:
logging.debug('saving last_output: %s len %s', type(last_output),
len(last_output))
self.save_output(job.__class__.__name__, last_output)
if isinstance(last_output, dict):
logging.debug(
"saving last_output: %s len %s",
type(last_output),
len(last_output) if last_output is not None else "None",
)
for key in last_output:
self.save_output(key, last_output)
else:
if last_output is None:
logging.warning("> %s returned None", job.name)
# save using job name
self.save_output(job.name, last_output)
# replace last_output with dict to merge into inputs
last_output = {job.name, last_output}
# merge into inputs
try:
self.inputs.update(last_output)
except (TypeError, ValueError):
logging.warning("> Cannot merge output to inputs for job %s", job.name)
logging.info("Done saving %s outputs", job.name)

def save_output(self, name, data):
""" Save data to each output adapter
def save_output(self, name, data, results=False):
"""Save data to each output adapter
Args:
name:
name (str):
name to pass to the output adapters when saving the data
data:
data (Any):
data to save
"""

method = '_save' if not results else '_save_result'
for output in self.outputs:
output[name] = data
logging.info("saved %s output to %s", name, output)
getattr(output, method)(name, data)
logging.debug("saved %s output to %s", name, output)

def _run(self):
""" Runs all Pipeline's jobs
"""
"""Runs all Pipeline's jobs"""
self.run_hook("on_pipeline_start")

for job_class in self.job_list:
Expand All @@ -194,38 +249,43 @@ def _run(self):

self.run_hook("on_pipeline_finish")

def __call__(self, inputs=None, outputs=None, config=None):
""" Pipeline entrypoint
# should this be done here or before the hook?
for output_name in self.save_results:
self.save_output(output_name, self.inputs[output_name], results=True)

def __call__(
self,
inputs=None,
outputs=None,
config=None,
save_results: Sequence[str] | None = None,
):
"""Pipeline entrypoint
Sets up inputs, outputs and config (if specified) and runs the pipeline
"""
# Override inputs or outputs if specified
if inputs:
self.inputs = inputs
if outputs:
self.outputs = outputs

# FIXME define one type for outputs and just enforce it
# Eventually creating an Outputs class
if isinstance(self.outputs, set):
self.outputs = list(self.outputs)
self.outputs = self.__enforce_list(outputs)
if save_results:
self.save_results = self.__enforce_list(save_results)

if not isinstance(self.inputs, Inputs):
raise ValueError(f'{self.inputs} is not an Inputs object')
if not isinstance(self.outputs, list):
raise ValueError(f'{self.outputs} is not a list')
raise ValueError(f"{self.inputs} is not an Inputs object")

for i,output in enumerate(self.outputs):
for i, output in enumerate(self.outputs):
if isinstance(output, type):
self.outputs[i] = output = output()
if not isinstance(output, OutputAdapter):
raise ValueError(f'{output} is not an OutputAdapter')
raise ValueError(f"{output} is not an OutputAdapter")

# Check if something is missing
if not self.inputs:
logging.warning("Missing inputs for pipeline %s", self.name)
logging.warning("> Missing inputs for pipeline %s", self.name)
if not self.outputs:
logging.warning("Missing outputs for pipeline %s", self.name)
logging.warning("> Missing outputs for pipeline %s", self.name)

if not config:
config = {}
Expand Down
Loading

0 comments on commit 6d21edc

Please sign in to comment.