ืฉืืื, ืืืจ! ืืืืืจ ืื ืื ื ืจืืฆื ืืืืจ ืขื ืืื ืืื ื ืืืจ ืืคืืชืื ืชืืืืื ืขืืืื ื ืชืื ืื ืืฆืืื, ืืืฉื, ืืชืฉืชืืช ืฉื DWH ืืจืืื ื ืื DataLake ืฉืื. ื ืืืจ ืขื Apache Airflow (ืืืื Airflow). ืืื ื ืฉืืืช ืืืืคื ืื ืืืื ืชืฉืืืช ืื ืขื Habrรฉ, ืืืขืืงืจ ืื ืกื ืืฉืื ืข ืืืชื ืฉืืคืืืช Airflow ืฉืืื ืืืกืชืื ืืืฉืจ ืืชื ืืืืจ ืืชืืื ืืชืืืืื ื-ETL/ELT ืฉืื.
ืืขืืจ ืืชืืชื ืกืืจืช ืืืืจืื ืื ืืฉื DWH ืืฉืขืืืชื ืืื ืง ืืื ืงืืฃ. ืืขืช ืืคืืชื ืืืืง ืืฆืืืช Mail.Ru Group ืืื ื ืืคืชื ืคืืืคืืจืื ืื ืืชืื ื ืชืื ืื ืืชืืื ืืืฉืืงืื. ืืืขืฉื, ืืฉืืืคืืขื ืืืฉืืช ืืคืชืจืื ืืช ืืขื ืืื ืื, ืืฆืืืช ืฉืื ืืื ื ื ืืืจ ืืื ืขื ืืคืืืคืืจืื ืฉืื ื ืื ืืชืื ื ืชืื ืื.
ืคืจืืืื
ืื ืืืื ื ืชืืื. ืืื ืืจืืืช ืืืืืจ? ืืืื ืกืคืจืืื (ืื
ืขืืฉืื ืืืื ื ืกืชืื ืขื ืืืฉืืืืช ืืขืืงืจืืืช ืฉื Airflow. ืขื ืืื ืืื ืช ืืืืืช ืืืืืจื ืฉืืื, ืืชื ืืืื ืืืจืื ืืฆืืจื ืืืืืืช ืืช ืืจืืืืงืืืจืช ืืชืืืื ืฉืื. ืืืื ืืืฉืืช ืืขืืงืจืืช ืืื ืืืจืฃ ืืืฆืืงืื ืืืืืื (ืืืื DAG).
DAG
DAG ืืื ืืืืฉืื ืฉืืื ืืฉืืขืืชื ืฉื ืืืฉืืืืช ืฉืื ืฉืืชื ืจืืฆื ืืืฉืืื ืืจืฆืฃ ืืืืืจ ืืืืื ืืคื ืืื ืืื ืื ืกืคืฆืืคื. Airflow ืืกืคืง ืืืฉืง ืืื ืืจื ื ื ืื ืืขืืืื ืขื DAGs ืืืฉืืืืช ืืืจืืช:
ื-DAG ืขืฉืื ืืืืจืืืช ืื:
ืืืื, ืืขืช ืชืื ืื DAG, ืงืืืข ืืขืจื ืืคืขืืืื ืฉืขืืืื ืืืื ื ืืฉืืืืช ืืชืื ื-DAG. ืืื ืื ื ืืืืขืื ืืืฉืืช ืืฉืืื ื ืืกืคืช: ืืคืขืื ืืจืืืช ืืืืืจ.
ืืืคืจืืืจืื
ืืคืขืื ืืื ืืฉืืช ืฉืขื ืืกืืกื ื ืืฆืจืื ืืืคืขื ืขืืืื, ืืืชืืจืื ืื ืืงืจื ืืืืื ืืืฆืืข ืืืคืข ืขืืืื.
- BashOperator - ืืืคืจืืืจ ืืืืฆืืข ืคืงืืืช bash.
- PythonOperator - ืืคืขืื ืืงืจืืื ืืงืื Python.
- EmailOperator โ ืืืคืจืืืจ ืืฉืืืืช ืืืืจ ืืืงืืจืื ื.
- HTTPOperator - ืืืคืจืืืจ ืืขืืืื ืขื ืืงืฉืืช http.
- SqlOperator - ืืืคืจืืืจ ืืืืฆืืข ืงืื SQL.
- ืกื ืกืืจ ืืื ืืคืขืื ืืืืชื ื ืืืืจืืข (ืืืขืช ืืฉืขื ืื ืืจืฉืช, ืืืคืขืช ืืงืืืฅ ืื ืืจืฉ, ืฉืืจื ืืืืืจ, ืชืืืื ืื-API ืืื' ืืื').
ืืฉื ื ืืืคืจืืืจืื ืกืคืฆืืคืืื ืืืชืจ: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
ืืชื ืืืื ืื ืืคืชื ืืืคืจืืืจืื ืขื ืกืื ืืืืคืืื ืื ืฉืื ืืืืฉืชืืฉ ืืื ืืคืจืืืงื ืฉืื. ืืืืืื, ืืฆืจื ื ืืช MongoDBToHiveViaHdfsTransfer, ืืืคืจืืืจ ืืืืฆืื ืืกืืืื ื-MongoDB ื-Hive, ืืืื ืืืคืจืืืจืื ืืขืืืื ืขื
ืืืืจ ืืื, ืืฉ ืืืฆืข ืืช ืื ืืืงืจืื ืืืื ืฉื ืืฉืืืืช, ืืขืืฉืื ื ืืืจ ืขื ืืืชืืื.
ืืชืืื
ืืชืืื ืืืฉืืืืช ืฉื Airflow ืื ืื
ืืื ืืจืืื ืืฉ ืืืืื ืขื ืืกืคืจ ืืืฉืืฆืืช. ืืขืช ืืฆืืจืช DAG, ืืื ืืงืื ืืืืจ:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
ื ืืชื ืืขืงืืฃ ืืืืจ ืฉืืืืืจ ืืจืืช DAG ืืจืืช ืืืฉืืื.
ืชืืืื ื ืคืจื, Scheduler, ืืืจืื ืขื ืชืืืื ืื ืืืฉืืืืช ื-Airflow. ืืืขืฉื, ืืชืืื ืขืืกืง ืืื ืืืื ืืงื ืฉื ืืืืจืช ืืฉืืืืช ืืืืฆืืข. ืืืฉืืื ืขืืืจืช ืืกืคืจ ืฉืืืื ืืคื ื ืืืฆืืข:
- ืืืฉืืืืช ืืงืืืืืช ืืืฉืืื ื-DAG; ืืฉืืืืช ืืืฉืืช ื ืืชื ืืขืืื ืืชืืจ.
- ืืชืืจ ืืืืื ืืืชืื ืืขืืืคืืช ืืืฉืืืืช (ื ืืชื ืื ืืฉืืื ืืกืืจื ืืขืืืคืืืืช), ืืื ืืฉ ืืฉืืฆืช ืคื ืืื ืืืจืืื, ื ืืชื ืืงืืช ืืช ืืืฉืืื ืืคืขืืื.
- ืื ืืฉ ืกืืจื ืขืืื ืืื ื, ืืืฉืืื ื ืฉืืืช ืืืื; ืืขืืืื ืฉืชืื ืชืช ืืืขืื ืืชืืืื, ืืืืฆืขืืช ืืืคืจืืืจ ืืื ืื ืืืจ.
ืคืฉืื ืืกืคืืง.
ืืชืืื ืคืืขื ืขื ืืกื ืฉื ืื DAGs ืืื ืืืฉืืืืช ืืชืื DAGs.
ืืื ืฉืืชืืื ืืชืืื ืืขืืื ืขื DAG, ื-DAG ืฆืจืื ืืืืืืจ ืืื ืืื ืื:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
ืืฉ ืงืืืฆื ืฉื ืืืืจืืช ืืืื ืืช ืืจืืฉ: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
ืืชื ืืืื ืื ืืืฉืชืืฉ ืืืืืืื cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
ืชืืจืื ืืืฆืืข
ืืื ืืืืื ืืืฆื ืคืืขืืช Airflow, ืืฉืื ืืืืื ืืื ืชืืจืื ืืืฆืืข ืขืืืจ DAG. ื-Airflow, ื-DAG ืืฉ ืืื ืชืืจืื ืืืฆืืข, ืืืืืจ, ืืืชืื ืืืื ืืืื ืื ืฉื ืืขืืืื ืฉื ื-DAG, ื ืืฆืจืื ืืืคืขื ืืฉืืืืช ืขืืืจ ืื ืชืืจืื ืืืฆืืข. ืืืื ืชืืจืื ืืืฆืืข, ื ืืชื ืืืฆืข ืืืืฉ ืืฉืืืืช - ืื, ืืืฉื, DAG ืืืื ืืขืืื ืื-ืืื ืืช ืืืกืคืจ ืชืืจืืื ืืืฆืืข. ืื ืืืฆื ืืื ืืืืจืืจ:
ืืืจืื ืืฆืขืจ (ืื ืืืื ืืืจืื ืืืื: ืื ืชืืื ืืืฆื), ืื ืืืฉืื ืืืฉืืื ื-DAG ืืชืืงื, ืื ืืืืฆืืข ืืชืืจืื ืืืืฆืืข ืืงืืื ืืืฉืื ืชืื ืืชืืฉืืืช ืืืชืืืืช. ืื ืืื ืื ืืชื ืฆืจืื ืืืฉื ืืืืฉ ื ืชืื ืื ืืชืงืืคืืช ืงืืืืืช ืืืืฆืขืืช ืืืืืจืืชื ืืืฉ, ืืื ืื ืจืข ืื ืืืืืช ืืฉืืืืจ ืฉื ืืชืืฆืื ืืืืืช (ืืืืื, ืืฃ ืืื ืื ืืคืจืืข ืื ืืืืืืจ ืืช ืืืจืกื ืื ืืจืฉืช ืฉื ืงืื ืืืงืืจ ื-Git ืืืืฉื ืื ืืชื ืฆืจืื ืคืขื ืืืช, ืืื ืฉืืชื ืฆืจืื ืืืชื).
ืืฆืืจืช ืืฉืืืืช
ืืืืฉืื ืฉื ื-DAG ืืื ืงืื ื-Python, ืื ืฉืืฉ ืื ื ืืจื ืืืื ื ืืื ืืืคืืืช ืืช ืืืืช ืืงืื ืืฉืขืืืืื, ืืืฉื, ืขื ืืงืืจืืช ืืคืืฆืืื. ื ื ืื ืฉืืฉ ืื ืฉืืืฉื ืจืกืืกื MySQL ืืืงืืจ, ืืชื ืฆืจืื ืืืคืก ืืื ืืื ืืื ืืืืกืืฃ ืืื ื ืชืื ืื. ืืืช ืืขืื, ืืืืคื ืขืฆืืื ืืืืงืืื. ืงืื Python ื-DAG ืขืฉืื ืืืืจืืืช ืื:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
ื-DAG ื ืจืื ืื:
ืืืงืจื ืื, ืืชื ืืืื ืืืืกืืฃ ืื ืืืกืืจ ืจืกืืก ืคืฉืื ืขื ืืื ืืชืืืช ืืืืืจืืช ืืขืืืื ื-DAG. ื ืึนืึท!
ืืชื ืืืื ืื ืืืฉืชืืฉ ืืืฆืืจืช ืงืื ืืืจืื ืืืชืจ, ืืืฉื, ืืขืืื ืขื ืืงืืจืืช ืืฆืืจื ืฉื ืืกื ื ืชืื ืื ืื ืืชืืจ ืืื ื ืืืื, ืืืืืจืืชื ืืขืืืื ืขื ืืืื, ืืืืชืืฉื ืืชืืื ืืช ืฉื ืชืฉืชืืช DWH, ืืืฆืืจ ืชืืืื ืืืขืื ืช N ืืืืืืช ืืืืกืื ืฉืื. ืื, ืืืฉื, ืขืืืื ืขื API ืฉืืื ื ืชืืื ืืขืืืื ืขื ืคืจืืืจ ืืฆืืจืช ืจืฉืืื, ื ืืชื ืืืฆืืจ N ืืฉืืืืช ื-DAG ืืชืื ืจืฉืืื ืื, ืืืืืื ืืช ืืืงืืื ืฉื ืืงืฉืืช ื-API ืืืืืจ ืืืืจื ืื ืชืื ืื ืืืจืืฉืื ืื-API. ืึธืึดืืฉื!
ืืืืจ
ื-Airflow ืืฉ ืืืืจ ืืืืจื ืืฉืื, ืืกื ื ืชืื ืื (ืืืื ืืืืืช MySQL ืื Postgres, ืืฉ ืื ื Postgres), ืืืืืกื ืืช ืืฆืื ืืืฉืืืืช, DAGs, ืืืืจืืช ืืืืืจ, ืืฉืชื ืื ืืืืืืืื ืืื' ืืื'. ืืื ืืืืชื ืจืืฆื ืืืืจ ืฉ- ืืืืจ ื-Airflow ืืื ืคืฉืื ืืืื (ืืขืจื 20 ืืืืืืช) ืื ืื ืื ืืชื ืจืืฆื ืืื ืืช ืชืืืืืื ืืฉืื ืขื ืืืื. ืื ื ืืืืจ ืืช 100500 ืืืืืืืช ืืืืืจ ืืื ืคืืจืืืืงื, ืฉืืื ืฆืจืื ืืืืื ืืจืื ืืื ืืคื ื ืฉืืืื ื ืืื ืืื ืืช ืฉืืืืชื.
ื ืืืืจ
ืืืืจ ืืคืฉืืืช ืฉื ืืืืืจ, ืืชื ืืืื ืืื ืืช ืชืืืื ื ืืืืจ ืืฉืืืืช ืฉื ืื ืื. ืื ื ืืฉืชืืฉืื ืืคื ืงืก ืจืฉืืืืช ื-Zeppelin, ืฉืื ืื ื ืืืื ืื ืืช ืืฆื ืืืฉืืืืช:
ืื ืืืื ืืืืืช ืื ืืืฉืง ืืืื ืืจื ื ืฉื Airflow ืขืฆืื:
ืงืื ืืจืืืช ืืืืืืจ ืืื ืงืื ืคืชืื, ืื ืืืกืคื ื ืืชืจืื ืืืืืจื. ืื ืืืคืข ืคืืขื ืฉื ืืฉืืื, ืื ืืชืจืืฉืช ืฉืืืื, ืฉืืื ืืืืจ ืืื ืืงืืืฆื ืืืืืจื, ืฉืื ืืืจืื ืฆืืืช ืืคืืชืื ืืืชืืืื ืืืื.
ืื ื ืืงืืืื ืืขื ื ืืืืจ ืืืืฆืขืืช ืืืืจื (ืื ื ืืจืฉ), ืืืจื ืืคืืื ืื ื ืืงืืืื ืชืืื ื ืืืืืช ืฉื ืืืฉืืืืช ื-Airflow.
ืืกื ืืื
ืืจืืืช ืืืืืจ ืืื ืืขืืงืจ ืงืื ืคืชืื, ืืืชื ืื ืฆืจืื ืืฆืคืืช ืืื ื ืื ืืกืื. ืืื ืืืื ืื ืืืฉืงืืข ืืื ืืืืืฅ ืืื ืืื ืืช ืคืชืจืื ืฉืขืืื. ืืืืจื ืืจืช ืืฉืื, ืชืืืื ืื, ืื ืฉืืื ืืช ืื. ืืืืจืืช ืคืืชืื, ืืืืฉืืช, ืงืืืช ืืืกืคืช ืชืืืืืื ืืืฉืื - ืืชื ืชืืื ืืช ืื. ืืืืื, ืืชื ืฆืจืื ืืืงืืืฉ ืชืฉืืืช ืื ืจืื ืืืจืืื ืืคืจืืืงื, ืืืฆืืืืช ืฉื ืืจืืืช ืืืืืืจ ืขืฆืื: ื ืืกืื ืื ืงืืจืื.
ืขืืฉืื ืืฉ ืื ื Airflow ืขืืื ืืื ืืื ื-6,5 ืืืฃ ืืฉืืืืช. ืื ืื ืฉืื ืื ืืืืคืืื. ืืฉื ื ืืฉืืืืช ืฉื ืืขืื ืช ื ืชืื ืื ื-DWH ืืจืืฉื ืืืงืืจืืช ืจืืื ืืฉืื ืื ืืืืื ืกืคืฆืืคืืื, ืืฉ ืืฉืืืืช ืฉื ืืืฉืื ืืืื ืืช ืจืืืื ืืชืื ื-DWH ืืจืืฉื, ืืฉ ืืฉืืืืช ืฉื ืคืจืกืื ื ืชืื ืื ื-DWH ืืืืจ, ืืฉ ืืจืื ืืืื ืืฉืืืืช ืฉืื ืืช - ื-Airflow ืืืขืก ืืช ืืืื ืืื ืืืจ ืืื. ืื ืืืืจืื ืืืกืคืจืื, ืืื 2,3 ืืืฃ ืืฉืืืืช ELT ืืืืจืืืืช ืืฉืชื ื ืืชืื DWH (Hadoop), ืืขืจื. 2,5 ืืืืช ืืืืจื ืืืืข ืืงืืจืืช, ืื ืฆืืืช ื 4 ืืคืชืื ETL, ืืฉืจ ืืืืืงืื ืืขืืืื ื ืชืื ืื ETL ื-DWH ืืขืืืื ื ืชืื ืื ELT ืืชืื DWH ืืืืืื ืขืื ืื ืื ืืื, ืืขืืกืง ืืชืฉืชืืช ืืฉืืจืืช.
ืชืืื ืืืช ืืขืชืื
ืืกืคืจ ืืชืืืืืื ืืื ืืืืจื, ืืืืืจ ืืขืืงืจื ืฉื ืขืฉื ืืืื ืืื ืฉื ืชืฉืชืืช Airflow ืืื ืงื ื ืืืื. ืื ืื ื ืจืืฆืื ืืื ืืช ืืฉืืื Airflow, ืืืงืฆืืช ืืื ืจืืืืื ืืขืืืื ืกืืจื ืืืืฆืืจ ืจืืฉ ืืฉืืคืื ืขืฆืื ืขื ืชืืืืื ืชืืืื ืขืืืื ืืืืืจ.
ืืคืืืื
ืื, ืืืืื, ืื ืื ืื ืฉืืืืชื ืจืืฆื ืืกืคืจ ืขื Airflow, ืืื ื ืืกืืชื ืืืืืืฉ ืืช ืื ืงืืืืช ืืขืืงืจืืืช. ืืชืืืืื ืื ืขื ืืืืืื, ื ืกื ืืช ืื ืืชืืื ืืช ืื :)
ืืงืืจ: www.habr.com