-
Notifications
You must be signed in to change notification settings - Fork 2
/
dag.py
117 lines (93 loc) · 3.15 KB
/
dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.sensors import WebHdfsSensor
from adflow.ingestion.tutorial import tasks
from adflow.ingestion.tutorial import hql
logger = logging.getLogger(__name__)
DAG_ID = 'my-bigdata-dag'
default_args = {
'owner': 'Mehmet Vergili',
'start_date': datetime(2017, 11, 20),
'depends_on_past': False,
'email': '[email protected]',
'email_on_failure': '[email protected]',
'email_on_retry': '[email protected]',
'retries': 1,
'retry_delay': timedelta(minutes=5)}
dag = DAG(dag_id=DAG_ID,
default_args=default_args,
schedule_interval=timedelta(days=1))
source_data_sensor = WebHdfsSensor(
task_id='source_data_sensor',
filepath='/data/mydata/{{ ds }}/mydata.csv',
poke_interval=10,
timeout=5,
dag=dag
)
create_hive_db = HiveOperator(
task_id='create_hive_db',
hql="DROP DATABASE IF EXISTS {db} CASCADE; CREATE DATABASE {db};".format(db='my_hive_db'),
provide_context=True,
dag=dag
)
create_hive_db.set_upstream(source_data_sensor)
hdfs_to_hive_trasfer = HiveOperator(
task_id='hdfs_to_hive_trasfer',
hql=hql.HQL_HDFS_TO_HIVE_TRANSFER.format(table_name='mydata',
tmp_table_name='mydata_tmp',
hdfs_path='/data/mydata/{{ ds }}'),
schema='my_hive_db',
provide_context=True,
dag=dag
)
hdfs_to_hive_trasfer.set_upstream(create_hive_db)
count_data_rows = BranchPythonOperator(
task_id='count_data_rows',
python_callable=tasks.count_data_rows,
templates_dict={'schema': 'my_hive_db'},
provide_context=True,
dag=dag
)
count_data_rows.set_upstream(hdfs_to_hive_trasfer)
stop_flow = DummyOperator(
task_id='stop_flow',
dag=dag
)
create_source_id = PythonOperator(
task_id='create_source_id',
python_callable=tasks.create_source_id,
templates_dict={'source': 'mydata'},
provide_context=True,
dag=dag
)
create_source_id.set_upstream(source_data_sensor)
clean_data = HiveOperator(
task_id='clean_data',
hql=hql.HQL_CLEAN_DATA.format(source_id="{{ task_instance.xcom_pull(task_ids='create_source_id') }}",
clean_mydata='clean_mydata', mydata='mydata'),
schema='my_hive_db',
provide_context=True,
dag=dag
)
clean_data.set_upstream(create_source_id)
count_data_rows.set_downstream([stop_flow, clean_data])
move_data_mysql = PythonOperator(
task_id='move_data_mysql',
python_callable=tasks.move_data_mssql,
templates_dict={'schema': 'my_hive_db'},
provide_context=True,
dag=dag
)
move_data_mysql.set_upstream(clean_data)
send_email = EmailOperator(
task_id='send_email',
to='[email protected]',
subject='ingestion complete',
html_content="Date: {{ ds }}",
dag=dag)
send_email.set_upstream(move_data_mysql)