From e8c1144bb86e9a1f8b0195375185e9418f23f0c5 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Wed, 6 Apr 2016 15:01:17 +0200 Subject: [PATCH] Add consistent and thorough signal handling and logging Airflow spawns childs in the form of a webserver, scheduler, and executors. If the parent gets terminated (SIGTERM) it needs to properly propagate the signals to the childs otherwise these will get orphaned and end up as zombie processes. This patch resolves that issue. In addition Airflow does not store the PID of its services so they can be managed by traditional unix systems services like rc.d / upstart / systemd and the likes. This patch adds the "--pid" flag. By default it stores the PID in ~/airflow/airflow-.pid Lastly, the patch adds support for different log file locations: log, stdout, and stderr (respectively: --log-file, --stdout, --stderr). By default these are stored in ~/airflow/airflow-.log/out/err. * Resolves ISSUE-852 --- UPDATING.md | 7 + airflow/bin/cli.py | 169 ++++++++++++++++++++-- airflow/executors/base_executor.py | 6 + airflow/executors/local_executor.py | 4 +- airflow/jobs.py | 11 +- docs/start.rst | 4 +- scripts/ci/requirements.txt | 1 + scripts/systemd/README | 7 +- scripts/systemd/airflow-webserver.service | 10 +- scripts/systemd/airflow.conf | 1 + setup.py | 1 + 11 files changed, 188 insertions(+), 33 deletions(-) create mode 100644 scripts/systemd/airflow.conf diff --git a/UPDATING.md b/UPDATING.md index 7b0bc97aa0902..e6c48a72b6684 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -17,6 +17,13 @@ Previously, new DAGs would be scheduled immediately. To retain the old behavior, dags_are_paused_at_creation = False ``` +### Worker, Scheduler, Webserver, Kerberos, Flower now detach by default + +The different daemons have been reworked to behave like traditional Unix daemons. This allows +you to set PID file locations, log file locations including stdin and stderr. + +If you want to retain the old behavior specify ```-f``` or ```--foreground``` on the command line. + ### Deprecated Features These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0 diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 4c3cafb8052a7..3c25130c59fc0 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -13,6 +13,11 @@ from dateutil.parser import parse as parsedate import json +import daemon +from daemon.pidfile import TimeoutPIDLockFile +import signal +import sys + import airflow from airflow import jobs, settings from airflow import configuration as conf @@ -26,6 +31,34 @@ DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) +def sigint_handler(signal, frame): + sys.exit(0) + + +def setup_logging(filename): + root = logging.getLogger() + handler = logging.FileHandler(filename) + formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT) + handler.setFormatter(formatter) + root.addHandler(handler) + root.setLevel(settings.LOGGING_LEVEL) + + return handler.stream + + +def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): + if not stderr: + stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.err".format(process)) + if not stdout: + stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.out".format(process)) + if not log: + log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.log".format(process)) + if not pid: + pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.pid".format(process)) + + return pid, stdout, stderr, log + + def process_subdir(subdir): dags_folder = conf.get("core", "DAGS_FOLDER") dags_folder = os.path.expanduser(dags_folder) @@ -85,7 +118,6 @@ def backfill(args, dag=None): def trigger_dag(args): - session = settings.Session() # TODO: verify dag_id execution_date = datetime.now() @@ -133,7 +165,6 @@ def set_is_paused(is_paused, args, dag=None): def run(args, dag=None): - db_utils.pessimistic_connection_handling() if dag: args.dag_id = dag.dag_id @@ -343,6 +374,7 @@ def webserver(args): args.port, args.hostname)) app.run(debug=True, port=args.port, host=args.hostname) else: + pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) print( 'Running the Gunicorn server with {workers} {args.workerclass}' 'workers on host {args.hostname} and port ' @@ -350,8 +382,11 @@ def webserver(args): sp = subprocess.Popen([ 'gunicorn', '-w', str(args.workers), '-k', str(args.workerclass), '-t', str(args.worker_timeout), '-b', args.hostname + ':' + str(args.port), - 'airflow.www.app:cached_app()']) - sp.wait() + '-n', 'airflow-webserver', '--pid', pid, + 'airflow.www.app:cached_app()'] + ) + if args.foreground: + sp.wait() def scheduler(args): @@ -361,7 +396,28 @@ def scheduler(args): subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle) - job.run() + + if not args.foreground: + pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file) + handle = setup_logging(log_file) + stdout = open(stdout, 'w+') + stderr = open(stderr, 'w+') + + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + files_preserve=[handle], + stdout=stdout, + stderr=stderr, + ) + with ctx: + job.run() + + stdout.close() + stderr.close() + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + job.run() def serve_logs(args): @@ -384,10 +440,8 @@ def serve_logs(filename): # noqa def worker(args): - # Worker to serve static log files through this simple flask app env = os.environ.copy() env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME - sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) # Celery worker from airflow.executors.celery_executor import app as celery_app @@ -400,9 +454,35 @@ def worker(args): 'queues': args.queues, 'concurrency': args.concurrency, } - worker.run(**options) - sp.kill() + if not args.foreground: + pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file) + handle = setup_logging(log_file) + stdout = open(stdout, 'w+') + stderr = open(stderr, 'w+') + + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + files_preserve=[handle], + stdout=stdout, + stderr=stderr, + ) + with ctx: + sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) + worker.run(**options) + sp.kill() + + stdout.close() + stderr.close() + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + + sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) + + worker.run(**options) + sp.kill() + def initdb(args): # noqa print("DB: " + repr(settings.engine.url)) @@ -438,14 +518,55 @@ def flower(args): api = '' if args.broker_api: api = '--broker_api=' + args.broker_api - sp = subprocess.Popen(['flower', '-b', broka, port, api]) - sp.wait() + + if not args.foreground: + pid, stdout, stderr, log_file = setup_locations("flower", args.pid, args.stdout, args.stderr, args.log_file) + stdout = open(stdout, 'w+') + stderr = open(stderr, 'w+') + + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + stdout=stdout, + stderr=stderr, + ) + + with ctx: + sp = subprocess.Popen(['flower', '-b', broka, port, api]) + sp.wait() + + stdout.close() + stderr.close() + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + + sp = subprocess.Popen(['flower', '-b', broka, port, api]) + sp.wait() def kerberos(args): # noqa print(settings.HEADER) import airflow.security.kerberos - airflow.security.kerberos.run() + + if not args.foreground: + pid, stdout, stderr, log_file = setup_locations("kerberos", args.pid, args.stdout, args.stderr, args.log_file) + stdout = open(stdout, 'w+') + stderr = open(stderr, 'w+') + + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + stdout=stdout, + stderr=stderr, + ) + + with ctx: + airflow.security.kerberos.run() + + stdout.close() + stderr.close() + else: + airflow.security.kerberos.run() + Arg = namedtuple( 'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices']) @@ -475,6 +596,17 @@ class CLIFactory(object): type=parsedate), 'dry_run': Arg( ("-dr", "--dry_run"), "Perform a dry run", "store_true"), + 'pid': Arg( + ("--pid", ), "PID file location", + nargs='?'), + 'foreground': Arg( + ("-f", "--foreground"), "Do not detach. Run in foreground", "store_true"), + 'stderr': Arg( + ("--stderr", ), "Redirect stderr to this file"), + 'stdout': Arg( + ("--stdout", ), "Redirect stdout to this file"), + 'log_file': Arg( + ("-l", "--log-file"), "Location of the log file"), # backfill 'mark_success': Arg( @@ -658,7 +790,8 @@ class CLIFactory(object): }, { 'func': kerberos, 'help': "Start a kerberos ticket renewer", - 'args': ('dag_id', 'principal', 'keytab'), + 'args': ('principal', 'keytab', 'pid', + 'foreground', 'stdout', 'stderr', 'log_file'), }, { 'func': render, 'help': "Render a task instance's template(s)", @@ -699,6 +832,7 @@ class CLIFactory(object): 'func': webserver, 'help': "Start a Airflow webserver instance", 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname', + 'pid', 'foreground', 'stdout', 'stderr', 'log_file', 'debug'), }, { 'func': resetdb, @@ -711,15 +845,18 @@ class CLIFactory(object): }, { 'func': scheduler, 'help': "Start a scheduler scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle'), + 'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle', + 'pid', 'foreground', 'stdout', 'stderr', 'log_file'), }, { 'func': worker, 'help': "Start a Celery worker node", - 'args': ('do_pickle', 'queues', 'concurrency'), + 'args': ('do_pickle', 'queues', 'concurrency', + 'pid', 'foreground', 'stdout', 'stderr', 'log_file'), }, { 'func': flower, 'help': "Start a Celery Flower", - 'args': ('flower_port', 'broker_api'), + 'args': ('flower_port', 'broker_api', + 'pid', 'foreground', 'stdout', 'stderr', 'log_file'), }, { 'func': version, 'help': "Show the version", diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index c6940e35f9048..03075834eabb0 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -122,3 +122,9 @@ def end(self): # pragma: no cover all done. """ raise NotImplementedError() + + def terminate(self): + """ + This method is called when the daemon receives a SIGTERM + """ + raise NotImplementedError() diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 15a89e169cd64..f13ee6d1358d3 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -18,6 +18,7 @@ def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue + self.daemon = True def run(self): while True: @@ -34,7 +35,7 @@ def run(self): state = State.SUCCESS except subprocess.CalledProcessError as e: state = State.FAILED - self.logger.error("Failed to execute task {}:".format(str(e))) + self.logger.error("failed to execute task {}:".format(str(e))) # raise e self.result_queue.put((key, state)) self.task_queue.task_done() @@ -72,3 +73,4 @@ def end(self): [self.queue.put((None, None)) for w in self.workers] # Wait for commands to finish self.queue.join() + diff --git a/airflow/jobs.py b/airflow/jobs.py index 9011e6a03e208..7a7472112ffb2 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -17,17 +17,14 @@ from __future__ import print_function from __future__ import unicode_literals -from builtins import str from past.builtins import basestring from collections import defaultdict, Counter from datetime import datetime from itertools import product import getpass import logging -import signal import socket import subprocess -import sys from time import sleep from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_ @@ -40,7 +37,6 @@ from airflow.utils.db import provide_session, pessimistic_connection_handling from airflow.utils.email import send_email from airflow.utils.logging import LoggingMixin -from airflow.utils import asciiart Base = models.Base @@ -663,18 +659,13 @@ def prioritize_queued(self, session, executor, dagbag): def _execute(self): dag_id = self.dag_id - def signal_handler(signum, frame): - self.logger.error("SIGINT (ctrl-c) received") - sys.exit(1) - signal.signal(signal.SIGINT, signal_handler) - pessimistic_connection_handling() logging.basicConfig(level=logging.DEBUG) self.logger.info("Starting the scheduler") dagbag = models.DagBag(self.subdir, sync_to_db=True) - executor = dagbag.executor + executor = self.executor = dagbag.executor executor.start() i = 0 while not self.num_runs or self.num_runs > i: diff --git a/docs/start.rst b/docs/start.rst index 7741c3ae82663..54dea66039a14 100644 --- a/docs/start.rst +++ b/docs/start.rst @@ -22,7 +22,9 @@ The installation is quick and straightforward. Upon running these commands, Airflow will create the ``$AIRFLOW_HOME`` folder and lay an "airflow.cfg" file with defaults that get you going fast. You can inspect the file either in ``$AIRFLOW_HOME/airflow.cfg``, or through the UI in -the ``Admin->Configuration`` menu. +the ``Admin->Configuration`` menu. The PID file for the webserver will be stored +in ``$AIRFLOW_HOME/airflow-webserver.pid`` or in ``/run/airflow/webserver.pid`` +if started by systemd. Out of the box, Airflow uses a sqlite database, which you should outgrow fairly quickly since no parallelization is possible using this database diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index cf7769d08dfbc..0ec09d0d81783 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -7,6 +7,7 @@ coverage coveralls croniter dill +python-daemon docker-py filechunkio flake8 diff --git a/scripts/systemd/README b/scripts/systemd/README index e41731c4fcf63..c53144a04c7bd 100644 --- a/scripts/systemd/README +++ b/scripts/systemd/README @@ -1,8 +1,11 @@ -The systemd files in this directory are tested on RedHat based systems. Copy (or link) them to /usr/lib/systemd/system. +The systemd files in this directory are tested on RedHat based systems. Copy (or link) them to /usr/lib/systemd/system +and copy the airflow.conf to /etc/tmpfiles.d/ or /usr/lib/tmpfiles.d/. Copying airflow.conf ensures /run/airflow is +created with the right owner and permissions (0755 airflow airflow) + You can then start the different servers by using systemctl start . Enabling services can be done by issuing systemctl enable . By default the environment configuration points to /etc/sysconfig/airflow . You can copy the "airflow" file in this -directory and adjust it to yout liking. Make sure to specify the SCHEDULER_RUNS variabl. +directory and adjust it to your liking. Make sure to specify the SCHEDULER_RUNS variable. With some minor changes they probably work on other systemd systems. \ No newline at end of file diff --git a/scripts/systemd/airflow-webserver.service b/scripts/systemd/airflow-webserver.service index 68c1f993287e8..fa0a0ffcc3505 100644 --- a/scripts/systemd/airflow-webserver.service +++ b/scripts/systemd/airflow-webserver.service @@ -4,13 +4,17 @@ After=network.target postgresql.service mysql.service redis.service rabbitmq-ser Wants=postgresql.service mysql.service redis.service rabbitmq-server.service [Service] +PIDFile=/run/airflow/webserver.pid EnvironmentFile=/etc/sysconfig/airflow User=airflow Group=airflow -Type=simple -ExecStart=/bin/airflow webserver +Type=forking +ExecStart=/bin/airflow webserver --pid /run/airflow/webserver.pid +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID Restart=on-failure RestartSec=42s +PrivateTmp=true [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target diff --git a/scripts/systemd/airflow.conf b/scripts/systemd/airflow.conf new file mode 100644 index 0000000000000..55a11e4209c1b --- /dev/null +++ b/scripts/systemd/airflow.conf @@ -0,0 +1 @@ +D /run/airflow 0755 airflow airflow diff --git a/setup.py b/setup.py index 4e46615cfee11..e551b12baac72 100644 --- a/setup.py +++ b/setup.py @@ -117,6 +117,7 @@ def run(self): 'chartkick>=0.4.2, < 0.5', 'croniter>=0.3.8, <0.4', 'dill>=0.2.2, <0.3', + 'python-daemon>=2.1.1, <2.2', 'flask>=0.10.1, <0.11', 'flask-admin>=1.4.0, <2.0.0', 'flask-cache>=0.13.1, <0.14',