Skip to content

Commit

Permalink
Merge pull request apache#975 from jtschoonhoven/issue-974
Browse files Browse the repository at this point in the history
statuses column on /admin shows only active or most recent dag_runs
  • Loading branch information
bolkedebruin committed Mar 14, 2016
2 parents e2f183c + 254d583 commit 2e6447b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
4 changes: 3 additions & 1 deletion airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ <h2>DAGs</h2>
<th>DAG</th>
<th>Schedule</th>
<th>Owner</th>
<th style="padding-left: 5px;">Statuses
<th style="padding-left: 5px;">Recent Statuses
<span id="statuses_info" class="glyphicon glyphicon-info-sign" aria-hidden="true" title="Status of tasks from all active DAG runs or, if not currently active, from most recent run."></span>
<img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
</th>
<th class="text-center">Links</th>
Expand Down Expand Up @@ -211,6 +212,7 @@ <h2>DAGs</h2>
d3.select("#loading").remove();
}
$("#pause_header").tooltip();
$("#statuses_info").tooltip();

$("circle").tooltip({
html: true,
Expand Down
31 changes: 27 additions & 4 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import traceback

import sqlalchemy as sqla
from sqlalchemy import or_, desc
from sqlalchemy import or_, desc, and_


from flask import redirect, url_for, request, Markup, Response, current_app, render_template
Expand Down Expand Up @@ -548,13 +548,36 @@ def dag_stats(self):
task_ids += dag.task_ids
if not dag.is_subdag:
dag_ids.append(dag.dag_id)

TI = models.TaskInstance
DagRun = models.DagRun
session = Session()

LastDagRun = (
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.group_by(DagRun.dag_id)
.subquery('last_dag_run')
)

# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
qry = (
session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
.filter(TI.task_id.in_(task_ids))
.filter(TI.dag_id.in_(dag_ids))
.group_by(TI.dag_id, TI.state)
.outerjoin(DagRun, and_(
DagRun.dag_id == TI.dag_id,
DagRun.execution_date == TI.execution_date,
DagRun.state == State.RUNNING))
.outerjoin(LastDagRun, and_(
LastDagRun.c.dag_id == TI.dag_id,
LastDagRun.c.execution_date == TI.execution_date)
)
.filter(TI.task_id.in_(task_ids))
.filter(TI.dag_id.in_(dag_ids))
.filter(or_(
DagRun.dag_id != None,
LastDagRun.c.dag_id != None
))
.group_by(TI.dag_id, TI.state)
)

data = {}
Expand Down

0 comments on commit 2e6447b

Please sign in to comment.