From b82fb7400bef686c3d1afa1780f95ec0ebb55c3d Mon Sep 17 00:00:00 2001 From: qidewenwhen <32910701+qidewenwhen@users.noreply.github.com> Date: Thu, 21 Mar 2024 11:22:07 -0700 Subject: [PATCH] fix: Create workflow module scoped sagemaker_session to resolve test race condition (#4518) --- src/sagemaker/session.py | 4 +- tests/integ/sagemaker/workflow/conftest.py | 65 ++++++++++ .../sagemaker/workflow/test_experiment.py | 37 ++---- .../test_model_create_and_registration.py | 91 +++++++------- .../workflow/test_pipeline_var_behaviors.py | 11 +- tests/integ/sagemaker/workflow/test_retry.py | 31 +---- .../workflow/test_selective_execution.py | 22 +--- .../sagemaker/workflow/test_step_decorator.py | 114 ++++++++++-------- .../integ/sagemaker/workflow/test_workflow.py | 74 +++++------- .../workflow/test_workflow_with_clarify.py | 28 ++--- 10 files changed, 237 insertions(+), 240 deletions(-) create mode 100644 tests/integ/sagemaker/workflow/conftest.py diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 8d72051cc0..ee0a63358e 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -189,7 +189,7 @@ def __init__( sagemaker_runtime_client=None, sagemaker_featurestore_runtime_client=None, default_bucket=None, - settings=SessionSettings(), + settings=None, sagemaker_metrics_client=None, sagemaker_config: dict = None, default_bucket_prefix: str = None, @@ -260,7 +260,7 @@ def __init__( self.resource_group_tagging_client = None self._config = None self.lambda_client = None - self.settings = settings + self.settings = settings if settings else SessionSettings() self._initialize( boto_session=boto_session, diff --git a/tests/integ/sagemaker/workflow/conftest.py b/tests/integ/sagemaker/workflow/conftest.py new file mode 100644 index 0000000000..ecb33bc261 --- /dev/null +++ b/tests/integ/sagemaker/workflow/conftest.py @@ -0,0 +1,65 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + +import pytest +from botocore.config import Config + +from tests.integ import DATA_DIR +from sagemaker import Session, get_execution_role + +CUSTOM_S3_OBJECT_KEY_PREFIX = "session-default-prefix" + + +# Create a sagemaker_session in workflow scope to prevent race condition +# with other tests. Some other tests may change the session `settings`. +@pytest.fixture(scope="module") +def sagemaker_session_for_pipeline( + sagemaker_client_config, + boto_session, +): + sagemaker_client_config.setdefault("config", Config(retries=dict(max_attempts=10))) + sagemaker_client = ( + boto_session.client("sagemaker", **sagemaker_client_config) + if sagemaker_client_config + else None + ) + + return Session( + boto_session=boto_session, + sagemaker_client=sagemaker_client, + sagemaker_config={}, + default_bucket_prefix=CUSTOM_S3_OBJECT_KEY_PREFIX, + ) + + +@pytest.fixture(scope="module") +def smclient(sagemaker_session): + return sagemaker_session.boto_session.client("sagemaker") + + +@pytest.fixture(scope="module") +def role(sagemaker_session_for_pipeline): + return get_execution_role(sagemaker_session_for_pipeline) + + +@pytest.fixture(scope="module") +def region_name(sagemaker_session_for_pipeline): + return sagemaker_session_for_pipeline.boto_session.region_name + + +@pytest.fixture(scope="module") +def script_dir(): + return os.path.join(DATA_DIR, "sklearn_processing") diff --git a/tests/integ/sagemaker/workflow/test_experiment.py b/tests/integ/sagemaker/workflow/test_experiment.py index 35828a69cd..ae98cd349c 100644 --- a/tests/integ/sagemaker/workflow/test_experiment.py +++ b/tests/integ/sagemaker/workflow/test_experiment.py @@ -19,7 +19,6 @@ from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution from sagemaker.processing import ProcessingInput -from sagemaker.session import get_execution_role from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.dataset_definition.inputs import DatasetDefinition, AthenaDatasetDefinition from sagemaker.workflow.execution_variables import ExecutionVariables @@ -33,33 +32,13 @@ from tests.integ import DATA_DIR -@pytest.fixture(scope="module") -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - -@pytest.fixture(scope="module") -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - -@pytest.fixture(scope="module") -def script_dir(): - return os.path.join(DATA_DIR, "sklearn_processing") - - @pytest.fixture def pipeline_name(): return f"my-pipeline-{int(time.time() * 10**7)}" @pytest.fixture -def smclient(sagemaker_session): - return sagemaker_session.boto_session.client("sagemaker") - - -@pytest.fixture -def athena_dataset_definition(sagemaker_session): +def athena_dataset_definition(sagemaker_session_for_pipeline): return DatasetDefinition( local_path="/opt/ml/processing/input/add", data_distribution_type="FullyReplicated", @@ -69,7 +48,7 @@ def athena_dataset_definition(sagemaker_session): database="default", work_group="workgroup", query_string='SELECT * FROM "default"."s3_test_table_$STAGE_$REGIONUNDERSCORED";', - output_s3_uri=f"s3://{sagemaker_session.default_bucket()}/add", + output_s3_uri=f"s3://{sagemaker_session_for_pipeline.default_bucket()}/add", output_format="JSON", output_compression="GZIP", ), @@ -77,7 +56,7 @@ def athena_dataset_definition(sagemaker_session): def test_pipeline_execution_with_default_experiment_config( - sagemaker_session, + sagemaker_session_for_pipeline, smclient, role, sklearn_latest_version, @@ -99,7 +78,7 @@ def test_pipeline_execution_with_default_experiment_config( instance_type=cpu_instance_type, instance_count=instance_count, command=["python3"], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, base_job_name="test-sklearn", ) @@ -113,7 +92,7 @@ def test_pipeline_execution_with_default_experiment_config( name=pipeline_name, parameters=[instance_count], steps=[step_sklearn], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -142,7 +121,7 @@ def test_pipeline_execution_with_default_experiment_config( def test_pipeline_execution_with_custom_experiment_config( - sagemaker_session, + sagemaker_session_for_pipeline, smclient, role, sklearn_latest_version, @@ -164,7 +143,7 @@ def test_pipeline_execution_with_custom_experiment_config( instance_type=cpu_instance_type, instance_count=instance_count, command=["python3"], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, base_job_name="test-sklearn", ) @@ -185,7 +164,7 @@ def test_pipeline_execution_with_custom_experiment_config( trial_name=Join(on="-", values=["my-trial", ExecutionVariables.PIPELINE_EXECUTION_ID]), ), steps=[step_sklearn], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_model_create_and_registration.py b/tests/integ/sagemaker/workflow/test_model_create_and_registration.py index 75e086bcfe..0733649cb2 100644 --- a/tests/integ/sagemaker/workflow/test_model_create_and_registration.py +++ b/tests/integ/sagemaker/workflow/test_model_create_and_registration.py @@ -35,7 +35,6 @@ Model, ModelMetrics, MetricsSource, - get_execution_role, ) from sagemaker import FileSource, utils from sagemaker.inputs import CreateModelInput @@ -59,23 +58,13 @@ from tests.integ import DATA_DIR -@pytest.fixture -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - @pytest.fixture def pipeline_name(): return utils.unique_name_from_base("my-pipeline-model-regis") -@pytest.fixture -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - def test_conditional_pytorch_training_model_registration( - sagemaker_session, + sagemaker_session_for_pipeline, role, cpu_instance_type, pipeline_name, @@ -83,7 +72,7 @@ def test_conditional_pytorch_training_model_registration( ): base_dir = os.path.join(DATA_DIR, "pytorch_mnist") entry_point = os.path.join(base_dir, "mnist.py") - input_path = sagemaker_session.upload_data( + input_path = sagemaker_session_for_pipeline.upload_data( path=os.path.join(base_dir, "training"), key_prefix="integ-test-data/pytorch_mnist/training", ) @@ -109,7 +98,7 @@ def test_conditional_pytorch_training_model_registration( py_version="py3", instance_count=instance_count, instance_type=instance_type, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) step_train = TrainingStep( name="pytorch-train", @@ -136,7 +125,7 @@ def test_conditional_pytorch_training_model_registration( model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, role=role, ) model_inputs = CreateModelInput( @@ -168,7 +157,7 @@ def test_conditional_pytorch_training_model_registration( instance_count, ], steps=[step_train, step_cond], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -200,7 +189,7 @@ def test_conditional_pytorch_training_model_registration( def test_mxnet_model_registration( - sagemaker_session, + sagemaker_session_for_pipeline, role, cpu_instance_type, pipeline_name, @@ -227,7 +216,7 @@ def test_mxnet_model_registration( model_data=mx_mnist_model_data, framework_version="1.7.0", py_version="py3", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) step_register = RegisterModel( @@ -249,7 +238,7 @@ def test_mxnet_model_registration( name=pipeline_name, parameters=[instance_count, instance_type], steps=[step_register], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -281,10 +270,10 @@ def test_mxnet_model_registration( def test_sklearn_xgboost_sip_model_registration( - sagemaker_session, role, pipeline_name, region_name + sagemaker_session_for_pipeline, role, pipeline_name, region_name ): prefix = "sip" - bucket_name = sagemaker_session.default_bucket() + bucket_name = sagemaker_session_for_pipeline.default_bucket() instance_count = ParameterInteger(name="InstanceCount", default_value=1) instance_type = "ml.m5.xlarge" @@ -301,7 +290,7 @@ def test_sklearn_xgboost_sip_model_registration( instance_type=instance_type, instance_count=instance_count, framework_version="0.20.0", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) # The path to the raw data. @@ -376,7 +365,7 @@ def test_sklearn_xgboost_sip_model_registration( instance_type=instance_type, instance_count=instance_count, framework_version="0.90-2", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, py_version="py3", role=role, ) @@ -412,7 +401,7 @@ def test_sklearn_xgboost_sip_model_registration( source_dir=source_dir, code_location=code_location, role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, framework_version="0.20.0", py_version="py3", ) @@ -429,11 +418,11 @@ def test_sklearn_xgboost_sip_model_registration( framework_version="0.90-2", py_version="py3", role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) pipeline_model = PipelineModel( - [xgboost_model, sklearn_model], role, sagemaker_session=sagemaker_session + [xgboost_model, sklearn_model], role, sagemaker_session=sagemaker_session_for_pipeline ) step_register = RegisterModel( @@ -462,7 +451,7 @@ def test_sklearn_xgboost_sip_model_registration( output_path_param, ], steps=[processing_step, training_step, step_register], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -500,7 +489,7 @@ def test_sklearn_xgboost_sip_model_registration( ), ) def test_model_registration_with_drift_check_baselines( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, ): @@ -510,12 +499,12 @@ def test_model_registration_with_drift_check_baselines( # upload model data to s3 model_local_path = os.path.join(DATA_DIR, "mxnet_mnist/model.tar.gz") model_base_uri = "s3://{}/{}/input/model/{}".format( - sagemaker_session.default_bucket(), + sagemaker_session_for_pipeline.default_bucket(), "register_model_test_with_drift_baseline", utils.unique_name_from_base("model"), ) model_uri = S3Uploader.upload( - model_local_path, model_base_uri, sagemaker_session=sagemaker_session + model_local_path, model_base_uri, sagemaker_session=sagemaker_session_for_pipeline ) model_uri_param = ParameterString(name="model_uri", default_value=model_uri) @@ -525,14 +514,14 @@ def test_model_registration_with_drift_check_baselines( '"standard_deviation": 2.219186917819692}}}' ) metrics_base_uri = "s3://{}/{}/input/metrics/{}".format( - sagemaker_session.default_bucket(), + sagemaker_session_for_pipeline.default_bucket(), "register_model_test_with_drift_baseline", utils.unique_name_from_base("metrics"), ) metrics_uri = S3Uploader.upload_string_as_file_body( body=metrics_data, desired_s3_uri=metrics_base_uri, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) metrics_uri_param = ParameterString(name="metrics_uri", default_value=metrics_uri) @@ -610,7 +599,7 @@ def test_model_registration_with_drift_check_baselines( instance_type=instance_type, instance_count=instance_count, framework_version="0.90-2", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, py_version="py3", role=role, ) @@ -645,7 +634,7 @@ def test_model_registration_with_drift_check_baselines( instance_count, ], steps=[step_register], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -677,7 +666,7 @@ def test_model_registration_with_drift_check_baselines( assert execution_steps[0]["StepStatus"] == "Succeeded" assert execution_steps[0]["StepName"] == "MyRegisterModelStep-RegisterModel" - response = sagemaker_session.sagemaker_client.describe_model_package( + response = sagemaker_session_for_pipeline.sagemaker_client.describe_model_package( ModelPackageName=execution_steps[0]["Metadata"]["RegisterModel"]["Arn"] ) @@ -715,15 +704,15 @@ def test_model_registration_with_drift_check_baselines( def test_model_registration_with_model_repack( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, region_name, ): - kms_key = get_or_create_kms_key(sagemaker_session, role) + kms_key = get_or_create_kms_key(sagemaker_session_for_pipeline, role) base_dir = os.path.join(DATA_DIR, "pytorch_mnist") entry_point = os.path.join(base_dir, "mnist.py") - input_path = sagemaker_session.upload_data( + input_path = sagemaker_session_for_pipeline.upload_data( path=os.path.join(base_dir, "training"), key_prefix="integ-test-data/pytorch_mnist/training", ) @@ -742,7 +731,7 @@ def test_model_registration_with_model_repack( py_version="py3", instance_count=instance_count, instance_type=instance_type, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, output_kms_key=kms_key, ) step_train = TrainingStep( @@ -767,7 +756,7 @@ def test_model_registration_with_model_repack( model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, role=role, ) model_inputs = CreateModelInput( @@ -791,7 +780,7 @@ def test_model_registration_with_model_repack( name=pipeline_name, parameters=[good_enough_input, instance_count], steps=[step_cond], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -823,11 +812,16 @@ def test_model_registration_with_model_repack( def test_model_registration_with_tensorflow_model_with_pipeline_model( - sagemaker_session, role, tf_full_version, tf_full_py_version, pipeline_name, region_name + sagemaker_session_for_pipeline, + role, + tf_full_version, + tf_full_py_version, + pipeline_name, + region_name, ): base_dir = os.path.join(DATA_DIR, "tensorflow_mnist") entry_point = os.path.join(base_dir, "mnist_v2.py") - input_path = sagemaker_session.upload_data( + input_path = sagemaker_session_for_pipeline.upload_data( path=os.path.join(base_dir, "data"), key_prefix="integ-test-data/tf-scriptmode/mnist/training", ) @@ -845,7 +839,7 @@ def test_model_registration_with_tensorflow_model_with_pipeline_model( instance_type=instance_type, framework_version=tf_full_version, py_version=tf_full_py_version, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) step_train = TrainingStep( name="MyTrain", @@ -858,11 +852,14 @@ def test_model_registration_with_tensorflow_model_with_pipeline_model( framework_version="2.4", model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) pipeline_model = PipelineModel( - name="MyModelPipeline", models=[model], role=role, sagemaker_session=sagemaker_session + name="MyModelPipeline", + models=[model], + role=role, + sagemaker_session=sagemaker_session_for_pipeline, ) step_register_model = RegisterModel( @@ -880,7 +877,7 @@ def test_model_registration_with_tensorflow_model_with_pipeline_model( name=pipeline_name, parameters=[instance_count], steps=[step_train, step_register_model], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_pipeline_var_behaviors.py b/tests/integ/sagemaker/workflow/test_pipeline_var_behaviors.py index 6e17e31ad1..2bcdbc6e0c 100644 --- a/tests/integ/sagemaker/workflow/test_pipeline_var_behaviors.py +++ b/tests/integ/sagemaker/workflow/test_pipeline_var_behaviors.py @@ -15,7 +15,7 @@ import pytest from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution -from sagemaker import get_execution_role, utils +from sagemaker import utils from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThan from sagemaker.workflow.fail_step import FailStep @@ -24,17 +24,12 @@ from sagemaker.workflow.pipeline import Pipeline -@pytest.fixture -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - @pytest.fixture def pipeline_name(): return utils.unique_name_from_base("my-pipeline-vars") -def test_ppl_var_to_string_and_add(sagemaker_session, role, pipeline_name): +def test_ppl_var_to_string_and_add(sagemaker_session_for_pipeline, role, pipeline_name): param_str = ParameterString(name="MyString", default_value="1") param_int = ParameterInteger(name="MyInteger", default_value=3) @@ -65,7 +60,7 @@ def test_ppl_var_to_string_and_add(sagemaker_session, role, pipeline_name): name=pipeline_name, parameters=[param_str, param_int], steps=[step_cond, step_fail], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_retry.py b/tests/integ/sagemaker/workflow/test_retry.py index e4c6f52a60..31c9859d50 100644 --- a/tests/integ/sagemaker/workflow/test_retry.py +++ b/tests/integ/sagemaker/workflow/test_retry.py @@ -20,7 +20,6 @@ from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution from sagemaker.processing import ProcessingInput -from sagemaker.session import get_execution_role from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.dataset_definition.inputs import ( DatasetDefinition, @@ -51,33 +50,13 @@ from tests.integ import DATA_DIR -@pytest.fixture(scope="module") -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - -@pytest.fixture(scope="module") -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - -@pytest.fixture(scope="module") -def script_dir(): - return os.path.join(DATA_DIR, "sklearn_processing") - - @pytest.fixture def pipeline_name(): return f"my-pipeline-{int(time.time() * 10**7)}" @pytest.fixture -def smclient(sagemaker_session): - return sagemaker_session.boto_session.client("sagemaker") - - -@pytest.fixture -def athena_dataset_definition(sagemaker_session): +def athena_dataset_definition(sagemaker_session_for_pipeline): return DatasetDefinition( local_path="/opt/ml/processing/input/add", data_distribution_type="FullyReplicated", @@ -87,7 +66,7 @@ def athena_dataset_definition(sagemaker_session): database="default", work_group="workgroup", query_string='SELECT * FROM "default"."s3_test_table_$STAGE_$REGIONUNDERSCORED";', - output_s3_uri=f"s3://{sagemaker_session.default_bucket()}/add", + output_s3_uri=f"s3://{sagemaker_session_for_pipeline.default_bucket()}/add", output_format="JSON", output_compression="GZIP", ), @@ -95,7 +74,7 @@ def athena_dataset_definition(sagemaker_session): def test_pipeline_execution_processing_step_with_retry( - sagemaker_session, + sagemaker_session_for_pipeline, smclient, role, sklearn_latest_version, @@ -117,7 +96,7 @@ def test_pipeline_execution_processing_step_with_retry( instance_type=cpu_instance_type, instance_count=instance_count, command=["python3"], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, base_job_name="test-sklearn", ) @@ -146,7 +125,7 @@ def test_pipeline_execution_processing_step_with_retry( name=pipeline_name, parameters=[instance_count], steps=[step_sklearn], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_selective_execution.py b/tests/integ/sagemaker/workflow/test_selective_execution.py index a584c095d5..751b19f0b7 100644 --- a/tests/integ/sagemaker/workflow/test_selective_execution.py +++ b/tests/integ/sagemaker/workflow/test_selective_execution.py @@ -24,7 +24,7 @@ from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig from tests.integ.sagemaker.workflow.helpers import create_and_execute_pipeline -from sagemaker import utils, get_execution_role +from sagemaker import utils from sagemaker.workflow.function_step import step from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import ProcessingStep @@ -32,23 +32,13 @@ INSTANCE_TYPE = "ml.m5.large" -@pytest.fixture -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - -@pytest.fixture -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - @pytest.fixture def pipeline_name(): return utils.unique_name_from_base("Selective-Pipeline") def test_selective_execution_among_pure_function_steps( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): # Test Selective Pipeline Execution on function step1 -> [select: function step2] os.environ["AWS_DEFAULT_REGION"] = region_name @@ -75,7 +65,7 @@ def sum(a, b): pipeline = Pipeline( name=pipeline_name, steps=[step_output_b], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -117,7 +107,7 @@ def sum(a, b): def test_selective_execution_of_regular_step_referenced_by_function_step( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, region_name, @@ -135,7 +125,7 @@ def test_selective_execution_of_regular_step_referenced_by_function_step( instance_type=INSTANCE_TYPE, instance_count=1, command=["python3"], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, base_job_name="test-sklearn", ) @@ -159,7 +149,7 @@ def func_2(arg): pipeline = Pipeline( name=pipeline_name, steps=[final_output], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_step_decorator.py b/tests/integ/sagemaker/workflow/test_step_decorator.py index 3c19a37cc3..216bf5674a 100644 --- a/tests/integ/sagemaker/workflow/test_step_decorator.py +++ b/tests/integ/sagemaker/workflow/test_step_decorator.py @@ -19,7 +19,7 @@ import os import random -from sagemaker import get_execution_role, utils +from sagemaker import utils from sagemaker.config import load_sagemaker_config from sagemaker.processing import ProcessingInput from sagemaker.remote_function.errors import RemoteFunctionError @@ -51,22 +51,14 @@ INSTANCE_TYPE = "ml.m5.large" -@pytest.fixture -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - -@pytest.fixture -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - @pytest.fixture def pipeline_name(): return utils.unique_name_from_base("Decorated-Step-Pipeline") -def test_compile_pipeline_with_function_steps(sagemaker_session, role, pipeline_name, region_name): +def test_compile_pipeline_with_function_steps( + sagemaker_session_for_pipeline, role, pipeline_name, region_name +): @step( name="generate", role=role, @@ -94,7 +86,7 @@ def print_result(result): pipeline = Pipeline( name=pipeline_name, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, steps=[generated, conditional_print], ) @@ -104,9 +96,9 @@ def print_result(result): # verify the artifacts are uploaded to the location specified by sagemaker_session assert ( len( - sagemaker_session.list_s3_files( - sagemaker_session.default_bucket(), - f"{sagemaker_session.default_bucket_prefix}/{pipeline_name}/generate", + sagemaker_session_for_pipeline.list_s3_files( + sagemaker_session_for_pipeline.default_bucket(), + f"{sagemaker_session_for_pipeline.default_bucket_prefix}/{pipeline_name}/generate", ) ) > 0 @@ -114,9 +106,9 @@ def print_result(result): assert ( len( - sagemaker_session.list_s3_files( - sagemaker_session.default_bucket(), - f"{sagemaker_session.default_bucket_prefix}/{pipeline_name}/print", + sagemaker_session_for_pipeline.list_s3_files( + sagemaker_session_for_pipeline.default_bucket(), + f"{sagemaker_session_for_pipeline.default_bucket_prefix}/{pipeline_name}/print", ) ) > 0 @@ -129,7 +121,7 @@ def print_result(result): def test_step_decorator_no_dependencies( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name @@ -149,7 +141,7 @@ def sum(a, b): pipeline = Pipeline( name=pipeline_name, steps=[step_output_a, step_output_b], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -172,7 +164,7 @@ def sum(a, b): def test_step_decorator_with_execution_dependencies( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name @@ -193,7 +185,7 @@ def sum(a, b): pipeline = Pipeline( name=pipeline_name, steps=[step_output_b], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -216,7 +208,7 @@ def sum(a, b): def test_step_decorator_with_data_dependencies( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name @@ -242,7 +234,7 @@ def sum(a, b): pipeline = Pipeline( name=pipeline_name, steps=[step_output_b], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -266,7 +258,7 @@ def sum(a, b): def test_step_decorator_with_pipeline_parameters( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name instance_type = ParameterString(name="TrainingInstanceCount", default_value=INSTANCE_TYPE) @@ -287,7 +279,7 @@ def sum(a, b): name=pipeline_name, parameters=[instance_type], steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -312,7 +304,7 @@ def sum(a, b): def test_passing_different_pipeline_variables_to_function( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, region_name, @@ -334,7 +326,7 @@ def test_passing_different_pipeline_variables_to_function( instance_type=INSTANCE_TYPE, instance_count=1, command=["python3"], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, base_job_name="test-sklearn", ) @@ -376,7 +368,7 @@ def func_2(*args): name=pipeline_name, parameters=[param_a, param_b, param_c, param_d], steps=[final_output], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -402,7 +394,7 @@ def func_2(*args): def test_step_decorator_with_pre_execution_script( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name pre_execution_script_path = os.path.join(DATA_DIR, "workflow", "pre_exec_commands") @@ -428,7 +420,7 @@ def validate_file_exists(files_exists, files_does_not_exist): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -450,17 +442,22 @@ def validate_file_exists(files_exists, files_does_not_exist): def test_step_decorator_with_include_local_workdir( - sagemaker_session, role, pipeline_name, region_name, monkeypatch, dummy_container_without_error + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + monkeypatch, + dummy_container_without_error, ): os.environ["AWS_DEFAULT_REGION"] = region_name source_dir_path = os.path.join(os.path.dirname(__file__)) - original_sagemaker_config = sagemaker_session.sagemaker_config + original_sagemaker_config = sagemaker_session_for_pipeline.sagemaker_config with monkeypatch.context() as m: m.chdir(source_dir_path) sagemaker_config = load_sagemaker_config( [os.path.join(DATA_DIR, "workflow", "config.yaml")] ) - sagemaker_session.sagemaker_config = sagemaker_config + sagemaker_session_for_pipeline.sagemaker_config = sagemaker_config dependencies_path = os.path.join(DATA_DIR, "workflow", "requirements.txt") @step( @@ -483,7 +480,7 @@ def train(x): pipeline = Pipeline( name=pipeline_name, steps=[step_result], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -504,11 +501,16 @@ def train(x): pipeline.delete() except Exception: pass - sagemaker_session.sagemaker_config = original_sagemaker_config + sagemaker_session_for_pipeline.sagemaker_config = original_sagemaker_config def test_decorator_with_conda_env( - sagemaker_session, role, pipeline_name, region_name, dummy_container_with_conda, conda_env_yml + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_conda, + conda_env_yml, ): os.environ["AWS_DEFAULT_REGION"] = region_name @@ -529,7 +531,7 @@ def cuberoot(x): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -553,7 +555,7 @@ def cuberoot(x): def test_decorator_step_failed( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, region_name, @@ -575,7 +577,7 @@ def divide(x, y): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -602,7 +604,7 @@ def divide(x, y): def test_decorator_step_with_json_get( - sagemaker_session, + sagemaker_session_for_pipeline, role, pipeline_name, region_name, @@ -651,7 +653,7 @@ def func3(): pipeline = Pipeline( name=pipeline_name, steps=[cond_step], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -754,7 +756,7 @@ def func(var: int): def test_decorator_step_checksum_mismatch( - sagemaker_session, dummy_container_without_error, pipeline_name, role + sagemaker_session_for_pipeline, dummy_container_without_error, pipeline_name, role ): step_name = "original_func_step" @@ -778,7 +780,7 @@ def updated_func(x): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -796,7 +798,7 @@ def updated_func(x): pickled_updated_func, s3_path_join(s3_base_uri, step_name, build_time, "function", "payload.pkl"), kms_key=None, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) execution = pipeline.start() wait_pipeline_execution(execution=execution, delay=20, max_attempts=20) @@ -815,7 +817,11 @@ def updated_func(x): def test_with_user_and_workdir_set_in_the_image( - sagemaker_session, role, pipeline_name, region_name, dummy_container_with_user_and_workdir + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_user_and_workdir, ): os.environ["AWS_DEFAULT_REGION"] = region_name dependencies_path = os.path.join(DATA_DIR, "workflow", "requirements.txt") @@ -836,7 +842,7 @@ def cuberoot(x): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -860,7 +866,11 @@ def cuberoot(x): def test_with_user_and_workdir_set_in_the_image_client_error_case( - sagemaker_session, role, pipeline_name, region_name, dummy_container_with_user_and_workdir + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_user_and_workdir, ): # This test aims to ensure client error in step decorated function # can be successfully surfaced and the job can be failed. @@ -880,7 +890,7 @@ def my_func(): pipeline = Pipeline( name=pipeline_name, steps=[step_a], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -906,7 +916,7 @@ def my_func(): def test_step_level_serialization( - sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error ): os.environ["AWS_DEFAULT_REGION"] = region_name @@ -940,7 +950,7 @@ def func_with_collision(var: str): pipeline = Pipeline( # noqa: F811 name=pipeline_name, steps=[step_output_b], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: diff --git a/tests/integ/sagemaker/workflow/test_workflow.py b/tests/integ/sagemaker/workflow/test_workflow.py index 462d774885..81871d13f4 100644 --- a/tests/integ/sagemaker/workflow/test_workflow.py +++ b/tests/integ/sagemaker/workflow/test_workflow.py @@ -50,7 +50,6 @@ ScriptProcessor, ) from sagemaker.s3 import S3Uploader -from sagemaker.session import get_execution_role from sagemaker.sklearn.estimator import SKLearn from sagemaker.transformer import Transformer from sagemaker.sklearn.processing import SKLearnProcessor @@ -107,23 +106,8 @@ def ordered(obj): @pytest.fixture(scope="module") -def region_name(sagemaker_session): - return sagemaker_session.boto_session.region_name - - -@pytest.fixture(scope="module") -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - -@pytest.fixture(scope="module") -def script_dir(): - return os.path.join(DATA_DIR, "sklearn_processing") - - -@pytest.fixture(scope="module") -def feature_store_session(sagemaker_session): - boto_session = sagemaker_session.boto_session +def feature_store_session(sagemaker_session_for_pipeline): + boto_session = sagemaker_session_for_pipeline.boto_session sagemaker_client = boto_session.client("sagemaker") featurestore_runtime_client = boto_session.client("sagemaker-featurestore-runtime") @@ -140,7 +124,7 @@ def pipeline_name(): @pytest.fixture(scope="module") -def athena_dataset_definition(sagemaker_session): +def athena_dataset_definition(sagemaker_session_for_pipeline): return DatasetDefinition( local_path="/opt/ml/processing/input/add", data_distribution_type="FullyReplicated", @@ -150,7 +134,7 @@ def athena_dataset_definition(sagemaker_session): database="default", work_group="workgroup", query_string=('SELECT * FROM "default"."s3_test_table_$STAGE_$REGIONUNDERSCORED";'), - output_s3_uri=f"s3://{sagemaker_session.default_bucket()}/add", + output_s3_uri=f"s3://{sagemaker_session_for_pipeline.default_bucket()}/add", output_format="JSON", output_compression="GZIP", ), @@ -486,7 +470,7 @@ def test_steps_with_map_params_pipeline( def test_one_step_ingestion_pipeline( - sagemaker_session, feature_store_session, feature_definitions, role, pipeline_name + sagemaker_session_for_pipeline, feature_store_session, feature_definitions, role, pipeline_name ): instance_count = ParameterInteger(name="InstanceCount", default_value=1) instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge") @@ -495,7 +479,7 @@ def test_one_step_ingestion_pipeline( input_file_path = os.path.join(DATA_DIR, "workflow", "features.csv") input_data_uri = os.path.join( "s3://", - sagemaker_session.default_bucket(), + sagemaker_session_for_pipeline.default_bucket(), "py-sdk-ingestion-test-input/features.csv", ) @@ -504,7 +488,7 @@ def test_one_step_ingestion_pipeline( S3Uploader.upload_string_as_file_body( body=body, desired_s3_uri=input_data_uri, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) inputs = [ @@ -550,7 +534,7 @@ def test_one_step_ingestion_pipeline( data_wrangler_flow_source=temp_flow_path, instance_count=instance_count, instance_type=instance_type, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, max_runtime_in_seconds=86400, ) @@ -566,7 +550,7 @@ def test_one_step_ingestion_pipeline( name=pipeline_name, parameters=[instance_count, instance_type], steps=[data_wrangler_step], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -574,7 +558,7 @@ def test_one_step_ingestion_pipeline( create_arn = response["PipelineArn"] offline_store_s3_uri = os.path.join( - "s3://", sagemaker_session.default_bucket(), feature_group_name + "s3://", sagemaker_session_for_pipeline.default_bucket(), feature_group_name ) feature_group.create( s3_uri=offline_store_s3_uri, @@ -626,15 +610,15 @@ def test_one_step_ingestion_pipeline( and only run as part of the 'lineage' test suite.""" ) def test_end_to_end_pipeline_successful_execution( - sagemaker_session, region_name, role, pipeline_name, wait=False + sagemaker_session_for_pipeline, region_name, role, pipeline_name, wait=False ): model_package_group_name = f"{pipeline_name}ModelPackageGroup" data_path = os.path.join(DATA_DIR, "workflow") - default_bucket = sagemaker_session.default_bucket() + default_bucket = sagemaker_session_for_pipeline.default_bucket() # download the input data local_input_path = os.path.join(data_path, "abalone-dataset.csv") - s3 = sagemaker_session.boto_session.resource("s3") + s3 = sagemaker_session_for_pipeline.boto_session.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region_name}").download_file( "dataset/abalone-dataset.csv", local_input_path ) @@ -646,7 +630,7 @@ def test_end_to_end_pipeline_successful_execution( input_data_uri = S3Uploader.upload_string_as_file_body( body=body, desired_s3_uri=f"{base_uri}/abalone-dataset.csv", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) # download batch transform data @@ -661,7 +645,7 @@ def test_end_to_end_pipeline_successful_execution( batch_data_uri = S3Uploader.upload_string_as_file_body( body=body, desired_s3_uri=f"{base_uri}/abalone-dataset-batch", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) # define parameters @@ -690,7 +674,7 @@ def test_end_to_end_pipeline_successful_execution( instance_count=processing_instance_count, base_job_name=f"{pipeline_name}-process", role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) step_process = ProcessingStep( name="AbaloneProcess", @@ -721,7 +705,7 @@ def test_end_to_end_pipeline_successful_execution( instance_count=1, output_path=model_path, role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) xgb_train.set_hyperparameters( objective="reg:linear", @@ -760,7 +744,7 @@ def test_end_to_end_pipeline_successful_execution( instance_count=1, base_job_name=f"{pipeline_name}-eval", role=role, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" @@ -791,7 +775,7 @@ def test_end_to_end_pipeline_successful_execution( model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, role=role, ) inputs = CreateModelInput( @@ -810,7 +794,7 @@ def test_end_to_end_pipeline_successful_execution( instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/{pipeline_name}Transform", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) step_transform = TransformStep( name="AbaloneTransform", @@ -869,7 +853,7 @@ def test_end_to_end_pipeline_successful_execution( batch_data, ], steps=[step_process, step_train, step_eval, step_cond], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) pipeline.create(role) @@ -924,7 +908,7 @@ def cleanup_feature_group(feature_group: FeatureGroup): pass -def test_large_pipeline(sagemaker_session, role, pipeline_name, region_name): +def test_large_pipeline(sagemaker_session_for_pipeline, role, pipeline_name, region_name): instance_count = ParameterInteger(name="InstanceCount", default_value=2) outputParam = CallbackOutput(output_name="output", output_type=CallbackOutputTypeEnum.String) @@ -942,7 +926,7 @@ def test_large_pipeline(sagemaker_session, role, pipeline_name, region_name): name=pipeline_name, parameters=[instance_count], steps=callback_steps, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -970,7 +954,7 @@ def test_large_pipeline(sagemaker_session, role, pipeline_name, region_name): def test_create_and_update_with_parallelism_config( - sagemaker_session, role, pipeline_name, region_name + sagemaker_session_for_pipeline, role, pipeline_name, region_name ): instance_count = ParameterInteger(name="InstanceCount", default_value=2) @@ -989,7 +973,7 @@ def test_create_and_update_with_parallelism_config( name=pipeline_name, parameters=[instance_count], steps=callback_steps, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: @@ -1228,7 +1212,7 @@ def test_model_registration_with_tuning_model( pass -def _verify_repack_output(repack_step_dict, sagemaker_session): +def _verify_repack_output(repack_step_dict, sagemaker_session_for_pipeline): # This is to verify if the `requirements.txt` provided in ModelStep # is not auto installed in the Repack step but is successfully repacked # in the new model.tar.gz @@ -1236,12 +1220,14 @@ def _verify_repack_output(repack_step_dict, sagemaker_session): # so if the `requirements.txt` is auto installed, it should raise an exception # caused by the unsupported library version listed in the `requirements.txt` training_job_arn = repack_step_dict["Metadata"]["TrainingJob"]["Arn"] - job_description = sagemaker_session.sagemaker_client.describe_training_job( + job_description = sagemaker_session_for_pipeline.sagemaker_client.describe_training_job( TrainingJobName=training_job_arn.split("/")[1] ) model_uri = job_description["ModelArtifacts"]["S3ModelArtifacts"] with tempfile.TemporaryDirectory() as tmp: - extract_files_from_s3(s3_url=model_uri, tmpdir=tmp, sagemaker_session=sagemaker_session) + extract_files_from_s3( + s3_url=model_uri, tmpdir=tmp, sagemaker_session=sagemaker_session_for_pipeline + ) def walk(): results = set() diff --git a/tests/integ/sagemaker/workflow/test_workflow_with_clarify.py b/tests/integ/sagemaker/workflow/test_workflow_with_clarify.py index 4a7feb597b..c96968a99b 100644 --- a/tests/integ/sagemaker/workflow/test_workflow_with_clarify.py +++ b/tests/integ/sagemaker/workflow/test_workflow_with_clarify.py @@ -32,7 +32,6 @@ SageMakerClarifyProcessor, ) from sagemaker.processing import ProcessingInput, ProcessingOutput -from sagemaker.session import get_execution_role from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep, JsonGet @@ -49,11 +48,6 @@ from tests.integ import timeout -@pytest.fixture(scope="module") -def role(sagemaker_session): - return get_execution_role(sagemaker_session) - - @pytest.fixture def pipeline_name(): return f"my-pipeline-clarify-{int(time.time() * 10**7)}" @@ -88,8 +82,10 @@ def headers(): @pytest.fixture(scope="module") -def data_config(sagemaker_session, data_path, headers): - output_path = f"s3://{sagemaker_session.default_bucket()}/linear_learner_analysis_result" +def data_config(sagemaker_session_for_pipeline, data_path, headers): + output_path = ( + f"s3://{sagemaker_session_for_pipeline.default_bucket()}/linear_learner_analysis_result" + ) return DataConfig( s3_data_input_path=data_path, s3_output_path=output_path, @@ -110,7 +106,7 @@ def data_bias_config(): @pytest.yield_fixture(scope="module") -def model_name(sagemaker_session, cpu_instance_type, training_set): +def model_name(sagemaker_session_for_pipeline, cpu_instance_type, training_set): job_name = utils.unique_name_from_base("clarify-xgb") with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES): @@ -119,7 +115,7 @@ def model_name(sagemaker_session, cpu_instance_type, training_set): 1, cpu_instance_type, predictor_type="binary_classifier", - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, disable_profiler=True, ) ll.binary_classifier_model_selection_criteria = "accuracy" @@ -135,7 +131,7 @@ def model_name(sagemaker_session, cpu_instance_type, training_set): job_name=job_name, ) - with timeout.timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): + with timeout.timeout_and_delete_endpoint_by_name(job_name, sagemaker_session_for_pipeline): ll.deploy(1, cpu_instance_type, endpoint_name=job_name, model_name=job_name, wait=True) yield job_name @@ -151,10 +147,10 @@ def model_config(model_name): @pytest.fixture(scope="module") -def model_predicted_label_config(sagemaker_session, model_name, training_set): +def model_predicted_label_config(sagemaker_session_for_pipeline, model_name, training_set): predictor = LinearLearnerPredictor( model_name, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) result = predictor.predict(training_set[0].astype(np.float32)) predictions = [float(record.label["score"].float32_tensor.values[0]) for record in result] @@ -169,7 +165,7 @@ def test_workflow_with_clarify( model_predicted_label_config, pipeline_name, role, - sagemaker_session, + sagemaker_session_for_pipeline, ): instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") @@ -221,7 +217,7 @@ def test_workflow_with_clarify( role="SageMakerRole", instance_count=instance_count, instance_type=instance_type, - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) property_file = PropertyFile( @@ -256,7 +252,7 @@ def test_workflow_with_clarify( name=pipeline_name, parameters=[instance_type, instance_count], steps=[step_process, step_condition], - sagemaker_session=sagemaker_session, + sagemaker_session=sagemaker_session_for_pipeline, ) try: