From f4fc7db5a89e613761cdc6880f1e134367ac47d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Mon, 16 Sep 2024 12:51:38 +0200 Subject: [PATCH] Fix problem with lineage and conditional execution --- .../workflows/execution_engine/v1/core.py | 2 +- .../execution_engine/v1/executor/core.py | 7 +- .../hosted_platform_tests/test_workflows.py | 2 +- .../test_workflow_endpoints.py | 2 +- .../test_florence2.py | 3 +- ...rkflow_with_branching_affecting_lineage.py | 108 ++++++++++++++++++ 6 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 tests/workflows/integration_tests/execution/test_workflow_with_branching_affecting_lineage.py diff --git a/inference/core/workflows/execution_engine/v1/core.py b/inference/core/workflows/execution_engine/v1/core.py index 20426d862..419ed8de1 100644 --- a/inference/core/workflows/execution_engine/v1/core.py +++ b/inference/core/workflows/execution_engine/v1/core.py @@ -17,7 +17,7 @@ validate_runtime_input, ) -EXECUTION_ENGINE_V1_VERSION = Version("1.1.0") +EXECUTION_ENGINE_V1_VERSION = Version("1.1.1") class ExecutionEngineV1(BaseExecutionEngine): diff --git a/inference/core/workflows/execution_engine/v1/executor/core.py b/inference/core/workflows/execution_engine/v1/executor/core.py index 521bf6503..69750e8ae 100644 --- a/inference/core/workflows/execution_engine/v1/executor/core.py +++ b/inference/core/workflows/execution_engine/v1/executor/core.py @@ -154,8 +154,9 @@ def run_simd_step_in_batch_mode( step_input = execution_data_manager.get_simd_step_input(step_selector=step_selector) if not step_input.indices: # no inputs - discarded either by conditional exec or by not accepting empty - return None - outputs = step_instance.run(**step_input.parameters) + outputs = [] + else: + outputs = step_instance.run(**step_input.parameters) execution_data_manager.register_simd_step_output( step_selector=step_selector, indices=step_input.indices, @@ -175,8 +176,6 @@ def run_simd_step_in_non_batch_mode( result = step_instance.run(**input_definition.parameters) results.append(result) indices.append(input_definition.index) - if not indices: - return None execution_data_manager.register_simd_step_output( step_selector=step_selector, indices=indices, diff --git a/tests/inference/hosted_platform_tests/test_workflows.py b/tests/inference/hosted_platform_tests/test_workflows.py index 2e82f442d..5715f57c7 100644 --- a/tests/inference/hosted_platform_tests/test_workflows.py +++ b/tests/inference/hosted_platform_tests/test_workflows.py @@ -127,7 +127,7 @@ def test_get_versions_of_execution_engine(object_detection_service_url: str) -> # then response.raise_for_status() response_data = response.json() - assert response_data["versions"] == ["1.1.0"] + assert response_data["versions"] == ["1.1.1"] FUNCTION = """ diff --git a/tests/inference/integration_tests/test_workflow_endpoints.py b/tests/inference/integration_tests/test_workflow_endpoints.py index 83f415a98..d9453c0f3 100644 --- a/tests/inference/integration_tests/test_workflow_endpoints.py +++ b/tests/inference/integration_tests/test_workflow_endpoints.py @@ -691,7 +691,7 @@ def test_get_versions_of_execution_engine(server_url: str) -> None: # then response.raise_for_status() response_data = response.json() - assert response_data["versions"] == ["1.1.0"] + assert response_data["versions"] == ["1.1.1"] def test_getting_block_schema_using_get_endpoint(server_url) -> None: diff --git a/tests/inference/models_predictions_tests/test_florence2.py b/tests/inference/models_predictions_tests/test_florence2.py index ad7602df3..574410145 100644 --- a/tests/inference/models_predictions_tests/test_florence2.py +++ b/tests/inference/models_predictions_tests/test_florence2.py @@ -1,6 +1,7 @@ +import numpy as np import pytest + from inference.models.florence2 import Florence2 -import numpy as np @pytest.mark.slow diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_branching_affecting_lineage.py b/tests/workflows/integration_tests/execution/test_workflow_with_branching_affecting_lineage.py new file mode 100644 index 000000000..8178de150 --- /dev/null +++ b/tests/workflows/integration_tests/execution/test_workflow_with_branching_affecting_lineage.py @@ -0,0 +1,108 @@ +import numpy as np + +from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS +from inference.core.managers.base import ModelManager +from inference.core.workflows.core_steps.common.entities import StepExecutionMode +from inference.core.workflows.execution_engine.core import ExecutionEngine + +PROBLEMATIC_WORKFLOW = { + "version": "1.0", + "inputs": [{"type": "WorkflowImage", "name": "image"}], + "steps": [ + { + "type": "ObjectDetectionModel", + "name": "general_detection", + "image": "$inputs.image", + "model_id": "yolov8n-640", + "class_filter": ["dog"], + }, + { + "type": "ContinueIf", + "name": "continue_if", + "condition_statement": { + "type": "StatementGroup", + "statements": [ + { + "type": "BinaryStatement", + "left_operand": { + "type": "DynamicOperand", + "operand_name": "prediction", + "operations": [{"type": "SequenceLength"}], + }, + "comparator": {"type": "(Number) =="}, + "right_operand": { + "type": "StaticOperand", + "value": 5, + }, + } + ], + }, + "evaluation_parameters": { + "prediction": "$steps.general_detection.predictions" + }, + "next_steps": ["$steps.cropping"], + }, + { + "type": "Crop", + "name": "cropping", + "image": "$inputs.image", + "predictions": "$steps.general_detection.predictions", + }, + { + "type": "ClassificationModel", + "name": "breds_classification", + "image": "$steps.cropping.crops", + "model_id": "dog-breed-xpaq6/1", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.breds_classification.predictions", + }, + ], +} + + +def test_workflow_with_flow_control_eliminating_step_changing_lineage( + model_manager: ModelManager, + dogs_image: np.ndarray, +) -> None: + """ + This test case covers bug that in Execution Engine versions <=1.1.0. + The bug was not registering outputs from steps given no inputs provided + (inputs may have been filtered by flow-control steps). As a result, + given that step changes dimensionality (registers new data lineage) - + registration could not happen and downstream steps was raising error: + "Lineage ['', 'XXX'] not found. [...]" + """ + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": None, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=PROBLEMATIC_WORKFLOW, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + + # when + result = execution_engine.run( + runtime_parameters={ + "image": [dogs_image], + } + ) + + # then + assert isinstance(result, list), "Expected result to be list" + assert len(result) == 1, "1 image provided, so 1 output elements expected" + assert result[0].keys() == { + "predictions" + }, "Expected all declared outputs to be delivered for first result" + assert len([e for e in result[0]["predictions"] if e]) == 0, ( + "Expected no predictions, due to conditional execution applied, effectively preventing " + "`cropping` step from running" + )