Skip to content

Commit

Permalink
Merge pull request apache#855 from bolkedebruin/ISSUE-852
Browse files Browse the repository at this point in the history
Use proper signal handling and cascade signals to children (Fix apache#852)
  • Loading branch information
bolkedebruin committed Apr 6, 2016
2 parents 81ff5cc + e8c1144 commit 4865ee6
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 33 deletions.
7 changes: 7 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ Previously, new DAGs would be scheduled immediately. To retain the old behavior,
dags_are_paused_at_creation = False
```

### Worker, Scheduler, Webserver, Kerberos, Flower now detach by default

The different daemons have been reworked to behave like traditional Unix daemons. This allows
you to set PID file locations, log file locations including stdin and stderr.

If you want to retain the old behavior specify ```-f``` or ```--foreground``` on the command line.

### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0

Expand Down
169 changes: 153 additions & 16 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from dateutil.parser import parse as parsedate
import json

import daemon
from daemon.pidfile import TimeoutPIDLockFile
import signal
import sys

import airflow
from airflow import jobs, settings
from airflow import configuration as conf
Expand All @@ -26,6 +31,34 @@
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))


def sigint_handler(signal, frame):
sys.exit(0)


def setup_logging(filename):
root = logging.getLogger()
handler = logging.FileHandler(filename)
formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT)
handler.setFormatter(formatter)
root.addHandler(handler)
root.setLevel(settings.LOGGING_LEVEL)

return handler.stream


def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
if not stderr:
stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.err".format(process))
if not stdout:
stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.out".format(process))
if not log:
log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.log".format(process))
if not pid:
pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), "airflow-{}.pid".format(process))

return pid, stdout, stderr, log


def process_subdir(subdir):
dags_folder = conf.get("core", "DAGS_FOLDER")
dags_folder = os.path.expanduser(dags_folder)
Expand Down Expand Up @@ -85,7 +118,6 @@ def backfill(args, dag=None):


def trigger_dag(args):

session = settings.Session()
# TODO: verify dag_id
execution_date = datetime.now()
Expand Down Expand Up @@ -133,7 +165,6 @@ def set_is_paused(is_paused, args, dag=None):


def run(args, dag=None):

db_utils.pessimistic_connection_handling()
if dag:
args.dag_id = dag.dag_id
Expand Down Expand Up @@ -343,15 +374,19 @@ def webserver(args):
args.port, args.hostname))
app.run(debug=True, port=args.port, host=args.hostname)
else:
pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
print(
'Running the Gunicorn server with {workers} {args.workerclass}'
'workers on host {args.hostname} and port '
'{args.port} with a timeout of {worker_timeout}...'.format(**locals()))
sp = subprocess.Popen([
'gunicorn', '-w', str(args.workers), '-k', str(args.workerclass),
'-t', str(args.worker_timeout), '-b', args.hostname + ':' + str(args.port),
'airflow.www.app:cached_app()'])
sp.wait()
'-n', 'airflow-webserver', '--pid', pid,
'airflow.www.app:cached_app()']
)
if args.foreground:
sp.wait()


def scheduler(args):
Expand All @@ -361,7 +396,28 @@ def scheduler(args):
subdir=process_subdir(args.subdir),
num_runs=args.num_runs,
do_pickle=args.do_pickle)
job.run()

if not args.foreground:
pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
job.run()

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
job.run()


def serve_logs(args):
Expand All @@ -384,10 +440,8 @@ def serve_logs(filename): # noqa


def worker(args):
# Worker to serve static log files through this simple flask app
env = os.environ.copy()
env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)

# Celery worker
from airflow.executors.celery_executor import app as celery_app
Expand All @@ -400,9 +454,35 @@ def worker(args):
'queues': args.queues,
'concurrency': args.concurrency,
}
worker.run(**options)
sp.kill()

if not args.foreground:
pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
worker.run(**options)
sp.kill()

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)

sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)

worker.run(**options)
sp.kill()


def initdb(args): # noqa
print("DB: " + repr(settings.engine.url))
Expand Down Expand Up @@ -438,14 +518,55 @@ def flower(args):
api = ''
if args.broker_api:
api = '--broker_api=' + args.broker_api
sp = subprocess.Popen(['flower', '-b', broka, port, api])
sp.wait()

if not args.foreground:
pid, stdout, stderr, log_file = setup_locations("flower", args.pid, args.stdout, args.stderr, args.log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout,
stderr=stderr,
)

with ctx:
sp = subprocess.Popen(['flower', '-b', broka, port, api])
sp.wait()

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)

sp = subprocess.Popen(['flower', '-b', broka, port, api])
sp.wait()


def kerberos(args): # noqa
print(settings.HEADER)
import airflow.security.kerberos
airflow.security.kerberos.run()

if not args.foreground:
pid, stdout, stderr, log_file = setup_locations("kerberos", args.pid, args.stdout, args.stderr, args.log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout,
stderr=stderr,
)

with ctx:
airflow.security.kerberos.run()

stdout.close()
stderr.close()
else:
airflow.security.kerberos.run()


Arg = namedtuple(
'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices'])
Expand Down Expand Up @@ -475,6 +596,17 @@ class CLIFactory(object):
type=parsedate),
'dry_run': Arg(
("-dr", "--dry_run"), "Perform a dry run", "store_true"),
'pid': Arg(
("--pid", ), "PID file location",
nargs='?'),
'foreground': Arg(
("-f", "--foreground"), "Do not detach. Run in foreground", "store_true"),
'stderr': Arg(
("--stderr", ), "Redirect stderr to this file"),
'stdout': Arg(
("--stdout", ), "Redirect stdout to this file"),
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),

# backfill
'mark_success': Arg(
Expand Down Expand Up @@ -658,7 +790,8 @@ class CLIFactory(object):
}, {
'func': kerberos,
'help': "Start a kerberos ticket renewer",
'args': ('dag_id', 'principal', 'keytab'),
'args': ('principal', 'keytab', 'pid',
'foreground', 'stdout', 'stderr', 'log_file'),
}, {
'func': render,
'help': "Render a task instance's template(s)",
Expand Down Expand Up @@ -699,6 +832,7 @@ class CLIFactory(object):
'func': webserver,
'help': "Start a Airflow webserver instance",
'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
'pid', 'foreground', 'stdout', 'stderr', 'log_file',
'debug'),
}, {
'func': resetdb,
Expand All @@ -711,15 +845,18 @@ class CLIFactory(object):
}, {
'func': scheduler,
'help': "Start a scheduler scheduler instance",
'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle'),
'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle',
'pid', 'foreground', 'stdout', 'stderr', 'log_file'),
}, {
'func': worker,
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency'),
'args': ('do_pickle', 'queues', 'concurrency',
'pid', 'foreground', 'stdout', 'stderr', 'log_file'),
}, {
'func': flower,
'help': "Start a Celery Flower",
'args': ('flower_port', 'broker_api'),
'args': ('flower_port', 'broker_api',
'pid', 'foreground', 'stdout', 'stderr', 'log_file'),
}, {
'func': version,
'help': "Show the version",
Expand Down
6 changes: 6 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ def end(self): # pragma: no cover
all done.
"""
raise NotImplementedError()

def terminate(self):
"""
This method is called when the daemon receives a SIGTERM
"""
raise NotImplementedError()
4 changes: 3 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.daemon = True

def run(self):
while True:
Expand All @@ -34,7 +35,7 @@ def run(self):
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
self.logger.error("Failed to execute task {}:".format(str(e)))
self.logger.error("failed to execute task {}:".format(str(e)))
# raise e
self.result_queue.put((key, state))
self.task_queue.task_done()
Expand Down Expand Up @@ -72,3 +73,4 @@ def end(self):
[self.queue.put((None, None)) for w in self.workers]
# Wait for commands to finish
self.queue.join()

11 changes: 1 addition & 10 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
from __future__ import print_function
from __future__ import unicode_literals

from builtins import str
from past.builtins import basestring
from collections import defaultdict, Counter
from datetime import datetime
from itertools import product
import getpass
import logging
import signal
import socket
import subprocess
import sys
from time import sleep

from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
Expand All @@ -40,7 +37,6 @@
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.email import send_email
from airflow.utils.logging import LoggingMixin
from airflow.utils import asciiart


Base = models.Base
Expand Down Expand Up @@ -663,18 +659,13 @@ def prioritize_queued(self, session, executor, dagbag):
def _execute(self):
dag_id = self.dag_id

def signal_handler(signum, frame):
self.logger.error("SIGINT (ctrl-c) received")
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)

pessimistic_connection_handling()

logging.basicConfig(level=logging.DEBUG)
self.logger.info("Starting the scheduler")

dagbag = models.DagBag(self.subdir, sync_to_db=True)
executor = dagbag.executor
executor = self.executor = dagbag.executor
executor.start()
i = 0
while not self.num_runs or self.num_runs > i:
Expand Down
4 changes: 3 additions & 1 deletion docs/start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ The installation is quick and straightforward.
Upon running these commands, Airflow will create the ``$AIRFLOW_HOME`` folder
and lay an "airflow.cfg" file with defaults that get you going fast. You can
inspect the file either in ``$AIRFLOW_HOME/airflow.cfg``, or through the UI in
the ``Admin->Configuration`` menu.
the ``Admin->Configuration`` menu. The PID file for the webserver will be stored
in ``$AIRFLOW_HOME/airflow-webserver.pid`` or in ``/run/airflow/webserver.pid``
if started by systemd.

Out of the box, Airflow uses a sqlite database, which you should outgrow
fairly quickly since no parallelization is possible using this database
Expand Down
1 change: 1 addition & 0 deletions scripts/ci/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ coverage
coveralls
croniter
dill
python-daemon
docker-py
filechunkio
flake8
Expand Down
Loading

0 comments on commit 4865ee6

Please sign in to comment.