Skip to content

Commit

Permalink
feat: ✨ Update run log status for Notebook/function execution (#95)
Browse files Browse the repository at this point in the history
* feat: ✨ Update run log status for Notebook/function execution

* feat: ✨ Update run log status for Notebook/function execution
  • Loading branch information
vijayvammi committed Mar 19, 2023
1 parent 236109c commit 6efc9f9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
13 changes: 13 additions & 0 deletions magnus/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,19 @@ def put_run_log(self, run_log: RunLog, **kwargs):
"""
raise NotImplementedError

def update_run_log_status(self, run_id: str, status: str):
"""
Updates the status of the Run Log defined by the run_id
Args:
run_id (str): The run_id of the run
status (str): The new status of the run
"""
logger.info(f'Updating status of run_id {run_id} to {status}')
run_log = self.get_run_log_by_id(run_id, full=False)
run_log.status = status
self.put_run_log(run_log)

def get_parameters(self, run_id: str, **kwargs) -> dict: # pylint: disable=unused-argument
"""
Get the parameters from the Run log defined by the run_id
Expand Down
4 changes: 4 additions & 0 deletions magnus/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ def execute_job(self, node: BaseNode):
self.run_log_store.add_step_log(step_log, self.run_id)
self.execute_node(node=node)

# Update the run log status
step_log = self.run_log_store.get_step_log(node._get_step_log_name(), self.run_id)
self.run_log_store.update_run_log_status(run_id=self.run_id, status=step_log.status)


class LocalContainerExecutor(BaseExecutor):
"""
Expand Down
5 changes: 4 additions & 1 deletion magnus/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,11 @@ def execute_nb_or_func(run_id, command: str, catalog_config: dict, configuratio

# Prepare for graph execution
mode_executor.prepare_for_node_execution()

logger.info('Executing the job')
mode_executor.execute_node(node=node)

# Update the status of the run log
step_log = mode_executor.run_log_store.get_step_log(node._get_step_log_name(), run_id)
mode_executor.run_log_store.update_run_log_status(run_id=run_id, status=step_log.status)

mode_executor.send_return_code()

0 comments on commit 6efc9f9

Please sign in to comment.