diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index b02de14b4e82b..0058d3a3f9960 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -304,6 +304,10 @@ def test(args): raise AirflowException('dag_id could not be found') dag = dagbag.dags[args.dag_id] task = dag.get_task(task_id=args.task_id) + # Add CLI provided task_params to task.params + if args.task_params: + passed_in_params = json.loads(args.task_params) + task.params.update(passed_in_params) ti = TaskInstance(task, args.execution_date) if args.dry_run: @@ -636,6 +640,8 @@ def get_parser(): default=DAGS_FOLDER) parser_test.add_argument( "-dr", "--dry_run", help="Perform a dry run", action="store_true") + parser_test.add_argument( + "-tp", "--task_params", help="Sends a JSON params dict to the task") parser_test.set_defaults(func=test) ht = "Get the status of a task instance." diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py new file mode 100644 index 0000000000000..faee40df9f4c7 --- /dev/null +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators import BashOperator, PythonOperator + +dag = DAG("example_passing_params_via_test_command", + default_args={"owner" : "me", + "start_date":datetime.now()}, + schedule_interval='*/1 * * * *', + dagrun_timeout=timedelta(minutes=4) + ) + +def my_py_command(ds, **kwargs): + # Print out the "foo" param passed in via + # `airflow test example_passing_params_via_test_command run_this + # -tp '{"foo":"bar"}'` + if kwargs["test_mode"]: + print(" 'foo' was passed in via test={} command : kwargs[params][foo] \ + = {}".format( kwargs["test_mode"], kwargs["params"]["foo"]) ) + # Print out the value of "miff", passed in below via the Python Operator + print(" 'miff' was passed in via task params = {}".format( kwargs["params"]["miff"]) ) + return 1 + +my_templated_command = """ + echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " + echo " 'miff was passed in via BashOperator with value {{ params.miff }} " +""" + +run_this = PythonOperator( + task_id='run_this', + provide_context=True, + python_callable=my_py_command, + params={"miff":"agg"}, + dag=dag) + +also_run_this = BashOperator( + task_id='also_run_this', + bash_command=my_templated_command, + params={"miff":"agg"}, + dag=dag) +also_run_this.set_upstream(run_this) diff --git a/tests/core.py b/tests/core.py index 734c46aa48c4a..68b864d79c9d5 100644 --- a/tests/core.py +++ b/tests/core.py @@ -31,7 +31,7 @@ from airflow.utils import AirflowException from airflow.configuration import AirflowConfigException -NUM_EXAMPLE_DAGS = 12 +NUM_EXAMPLE_DAGS = 13 DEV_NULL = '/dev/null' DEFAULT_DATE = datetime(2015, 1, 1) DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() @@ -638,6 +638,14 @@ def test_cli_test(self): 'test', 'example_bash_operator', 'runme_0', '--dry_run', DEFAULT_DATE.isoformat()])) + def test_cli_test_with_params(self): + cli.test(self.parser.parse_args([ + 'test', 'example_passing_params_via_test_command', 'run_this', + '-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()])) + cli.test(self.parser.parse_args([ + 'test', 'example_passing_params_via_test_command', 'also_run_this', + '-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()])) + def test_cli_run(self): cli.run(self.parser.parse_args([ 'run', 'example_bash_operator', 'runme_0', '-l',