Skip to content

Commit

Permalink
Merge pull request apache#1147 from r39132/master
Browse files Browse the repository at this point in the history
Enhance CLI Test command to accept a JSON-formatted dictionary of par…
  • Loading branch information
r39132 committed Mar 15, 2016
2 parents 2e6447b + 0f95489 commit ffc0701
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
6 changes: 6 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ def test(args):
raise AirflowException('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
task = dag.get_task(task_id=args.task_id)
# Add CLI provided task_params to task.params
if args.task_params:
passed_in_params = json.loads(args.task_params)
task.params.update(passed_in_params)
ti = TaskInstance(task, args.execution_date)

if args.dry_run:
Expand Down Expand Up @@ -636,6 +640,8 @@ def get_parser():
default=DAGS_FOLDER)
parser_test.add_argument(
"-dr", "--dry_run", help="Perform a dry run", action="store_true")
parser_test.add_argument(
"-tp", "--task_params", help="Sends a JSON params dict to the task")
parser_test.set_defaults(func=test)

ht = "Get the status of a task instance."
Expand Down
56 changes: 56 additions & 0 deletions airflow/example_dags/example_passing_params_via_test_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator

dag = DAG("example_passing_params_via_test_command",
default_args={"owner" : "me",
"start_date":datetime.now()},
schedule_interval='*/1 * * * *',
dagrun_timeout=timedelta(minutes=4)
)

def my_py_command(ds, **kwargs):
# Print out the "foo" param passed in via
# `airflow test example_passing_params_via_test_command run_this <date>
# -tp '{"foo":"bar"}'`
if kwargs["test_mode"]:
print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format( kwargs["test_mode"], kwargs["params"]["foo"]) )
# Print out the value of "miff", passed in below via the Python Operator
print(" 'miff' was passed in via task params = {}".format( kwargs["params"]["miff"]) )
return 1

my_templated_command = """
echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=my_py_command,
params={"miff":"agg"},
dag=dag)

also_run_this = BashOperator(
task_id='also_run_this',
bash_command=my_templated_command,
params={"miff":"agg"},
dag=dag)
also_run_this.set_upstream(run_this)
10 changes: 9 additions & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.utils import AirflowException
from airflow.configuration import AirflowConfigException

NUM_EXAMPLE_DAGS = 12
NUM_EXAMPLE_DAGS = 13
DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
Expand Down Expand Up @@ -638,6 +638,14 @@ def test_cli_test(self):
'test', 'example_bash_operator', 'runme_0', '--dry_run',
DEFAULT_DATE.isoformat()]))

def test_cli_test_with_params(self):
cli.test(self.parser.parse_args([
'test', 'example_passing_params_via_test_command', 'run_this',
'-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))
cli.test(self.parser.parse_args([
'test', 'example_passing_params_via_test_command', 'also_run_this',
'-tp', '{"foo":"bar"}', DEFAULT_DATE.isoformat()]))

def test_cli_run(self):
cli.run(self.parser.parse_args([
'run', 'example_bash_operator', 'runme_0', '-l',
Expand Down

0 comments on commit ffc0701

Please sign in to comment.