αα½ααααΈ α αΆα! αα αααα»αα’αααααααα αααα»αα αααα·ααΆαα’αααΈα§αααααααα’ααα αΆααααα½ααααααΆαααααααΎαααααΎαααΆαααααΎαααΆααα·ααααααααΆαααα»α α§ααΆα ααααα αααα»αα αααααΆαα ααΆαααααααααααααα»αα αα»α DWH α¬ DataLake ααααα’αααα ααΎαααΉααα·ααΆαα’αααΈ Apache Airflow (ααα αααα α ααΆ Airflow)α ααΆααααΌαααΆαααα αΌααααα’αα»αααα·αααα ααααααΆαααα α·ααααα»αααΆααααΎHabre α αΎααα αααα»ααααααααααΆαα αααα»αααΉαααααΆααΆααααα α»ααααα αΌαα’αααααΆ αααΆαα αα ααΆαα Airflow ααΊααΆααααααααΎααα αααααααΎαααΎααααααα·ααΈαααααααααααααΆααααααΎαααΆα ETL/ELT ααααα’αααα
ααΈαα»ααααα»αααΆααααααα’αααααααΆαααααααααΆααααΎαααααΆααα DWH αα αααααααααα»αααααΎααΆααα αααΆααΆα Tinkoff α α₯α‘αΌαααααααα»αααΆαααααΆαααΆααααααα½ααααααα»α Mail.Ru Group α αΎααααα»ααααααΎααααα·ααΆαααααΆααααΆααα·ααΆααα·αααααααα αααα»ααααααα ααααα ααΆααΆααα·αααΆαα αα ααααααααααααΆα αα·ααααααααααΆααα½αα±ααα αΆααα’αΆααααααααα α‘αΎα αααα»ααααααααα»α αα·ααααα»αααΉααα·ααΆααα ααΈαααα’αααΈαααα·ααΆααααααΎααααααΆααααΆααα·ααΆααα·ααααααα
Prologue
ααΌα
ααααααΌαα
αΆααααααΎαα ααΎααα αΌααααααααΊααΆα’αααΈ? αααααΊααΆαααααΆααα (α¬
α₯α‘αΌααααααΌααααα‘ααααΎαααΆαα»ααααΆααααα Airflow α ααΆααααααΆααααααΉαααΈααααΉαααΆα αα·αααααααααααααα½ααα α’αααα’αΆα αααα αααααΆααααααααααααΎαααΆαααααα’αααααΆααααΆαααα’αααααΎαα αααα ααααΆα’αααααΆαααααΆααααΊ Directed Acyclic Graph (ααα αααα α ααΆ DAG)α
DAG
DAG ααΊααΆααααΆααααααααααΆαα’αααααααααα·α αα ααΆαααααα’ααα αααα’αααα αααααααααΆαααααΆαααααααΆαααααααααΆαααΉααααΉαααααα ααΆαααΆααα·ααΆαααΆααααΆαααα½αα Airflow αααααααΌαα ααα»α αααααΆαααααααΆαααΆααααα½ααααααΆααααααΎααΆαααΆαα½α DAGs αα·αα’αααααΆαααααααααα
DAG α’αΆα ααΎααα ααΌα αααα
α’αααα’αα·αααααα αα ααααα ααΆ DAG ααΆαααΆαααααα»αααααααα·ααααα·αααααααΆααα·α αα αα αααα»α DAG ααΉαααααΌαααΆαααΆααααα αα ααΈαααααΎααααααα’αααααΆαααααΆαααα½αααα: ααααα·ααααα·ααααα αΌααααααα
ααααα·ααααα·αα
ααααα·ααααα·ααααΊααΆα’αααααΆααα½ααα
ααΎααΌαααααΆαααααααΈααΆαααΆαααααΌαααΆααααααΎα ααααα·αααααΆα’αααΈα’αααΈαααααΉαααΎαα‘αΎααααα»αα’αα‘α»ααααα’αα»ααααααΆαααΆαα
- BashOperator - ααααα·ααααα·αααααααΆααααααα·ααααα·ααΆααααααααΆ bash α
- PythonOperator - ααααα·ααααα·αααααααΆααααΆαα α ααΌα Python α
- EmailOperator - ααααα·ααααα·αααααααΆααααΆαααααΎα’ααΈαααα
- HTTPOperator - ααααα·ααααα·αααααααΆααααααΎααΆαααΆαα½αααααΎ http α
- SqlOperator - ααααα·ααααα·αααααααΆααααααα·ααααα·ααΌα SQL α
- α§αααααα αΆαααααααΆααΊααΆααααα·ααααα·αααααααΆαααααα αΆαααααΉαααα·ααΆααααα½α (ααΆαααααααααααααααΆαααααααΌαααΆα ααΌαααΆαααα―αααΆααααααααΌαααΆα αααααΆαααα αααα»αααΌαααααΆααα·αααααα ααΆαααααΎαααααΈ API ααΆααΎα)α
ααΆαααααα·ααααα·ααααΆααααΆαααααααααααα DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator α
α’αααααα’αΆα
α’αα·ααααααααα·ααααα·αααααααα’ααααΎααααααααααΆαααααα½αααααα’ααα α αΎαααααΎαα½αααΆαα
αααα»αααααααααααα’αααα α§ααΆα ααα ααΎαααΆααααααΎα MongoDBToHiveViaHdfsTransfer αααααΆααααα·ααααα·αααααααΆααααΆαααΆαα
ααα―αααΆαααΈ MongoDB αα
Hive αα·αααααα·ααααα·ααααΆα
αααΎααααααΆααααααΎααΆαααΆαα½α
αααααΆαααα αα·α αα ααΆαααΆααα’αααααα αΆαααΆα αααααΌαα’αα»αααα α αΎαα₯α‘αΌααααααΎαααΉααα·ααΆαα’αααΈαααααα·ααΈααααααααα
α’ααααααα αααΆααα·ααΆα
αααααα·ααΈααααααααααΆααα·α
αα
αααα Airflow ααααΌαααΆααααααΎαα‘αΎααα
ααΎ
α’αΆαααΈαα½ααααΆαααααααααααΎα ααα½ααααααααα αα ααααααααΎα DAG ααΆααααΌαααΆααααααα±ααα’αΆα:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
α’αΆααααααααααα
ααααα·α DAG α’αΆα
ααααΌαααΆαααα·ααααα
ααααα·αααΆααα·α
αα
α
ααααΎαααΆαααΆα
ααααα‘αααα½α Scheduler ααα½ααα»αααααΌααααα»αααΆααααααααΆααα·ααΆαααΆαααΆαααΆααα’αααα
αααα»α Airflow α ααΆααα·α Scheduler αααααααΆαααΆαα½αααααααΆαααΆααα’ααααααΆααααααααΆααα·α
αα
αααααΆααααΆαααααα·ααααα·α αα·α
αα
ααΆαααααΌαααααααΆααααααΆααααΆαααΆα
αααΎααα»αααΉαααααΌαααΆαα’αα»ααααα
- αα·α αα ααΆααα»ααααααΌαααΆααααα αααα αααα»α DAG α αΎα ααΆαααΆαααααΈα’αΆα ααααΌαααΆααααααααα½αα
- αα½αααααΌαααΆαααααααα’αΆαααααααΎα’αΆαα·ααΆααααα·α αα ααΆα (α’αΆαα·ααΆαα’αΆα ααααΌαααΆααααααααααααααα) α αΎαααααα·αααΎααΆααααααααααααααα αααα»αα’αΆα ααααα·α αα ααΆαα’αΆα ααααΌαααΆαααααΎαααΆαα
- ααααα·αααΎααΆα celery αααααααααα₯ααα·ααααα, ααΆααα·α αα ααααΌαααΆαααααΎαα ααΆ; ααΆαααΆααααα’αααααααααααααα·ααΈαααα»ααααα αΆα αΆααααααΎα αααααααΎααααα·ααααα·αααα½α α¬ααααααααα
ααΆαααααααααααααΆααα
αααααα·ααΈααααααααααααΎαααΆαααΎαααα»ααα DAGs αα·ααα·α αα ααΆαααΆααα’αααα αααα»α DAGs α
αααααΆαα Scheduler ααΎααααΈα αΆααααααΎαααααΎααΆαααΆαα½α DAG DAG ααααΌααααααααΆααα·ααΆαα
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
ααΆααααα»αααααΆααααααααΆαα»ααααααααααα½α
ααΆααααα
α @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
α’αααααα’αΆα ααααΎαααααα cronα
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
ααΆαααα·α ααααααααα·ααααα·
ααΎααααΈαααααΈααααααα Airflow ααααΎαααΆα ααΆααΆααΏαααααΆαααααααααΌααααααΈααΆαααα·α ααααααααα·ααααα·αααααΆαα DAG α αα αααα»α Airflow DAG ααΆααα·ααΆαααααΆαααα·α ααααααααα·ααααα· αααααΊα’αΆαααααααΎααΆααα·ααΆαααΆαααΆααααα DAG ααααΈαα·α αα ααΆαααααΌαααΆααααααΎααααααΆααααΆαααα·α ααααααααα·ααααα·ααΈαα½ααα α αΎααααααΆααααΆαααα·α ααααααααα·ααααα·ααΈαα½αα ααΆααα·α αα α’αΆα ααααΌαααΆαααααα·ααααα·α‘αΎααα·α α¬α§ααΆα ααα DAG α’αΆα ααααΎαααΆααααα»ααααααααΆαααααΆαααα»αααΆαααα·α ααααααααα·ααααα·ααΆα αααΎαα αααααααΌαααΆααααα αΆααααΆαα αααΆαααα ααΈαααα
ααΆα’αα»αα (α¬αααα ααααΆααααΆαα ααΆα’αΆαααααααΎααααΆαααΆα) ααααα·αααΎααΆαα’αα»ααααααΆααα·α αα αα αααα»α DAG ααααΌαααΆααααααααΌα αααααΆαααααα·ααααα·αααα»αααΆαααα·α ααααααααα·ααααα·αα»αααΉαααααααααα·αααΌαααΈααΆααααααααΌαα ααΆααα’ααααα·αααΎα’αααααααΌαααααΆαα·ααααααα‘αΎααα·ααααα»αααααααααααααααααααααΎαααα½ααααααααΆαααααΈ ααα»ααααααΆαα·αααα’αααααααααΆαααα·αα‘αΎααα·αααααααααααααΌαααΆαααΆααααα (ααΆααΆααα·αααΆααααααΆαααααΆααααΆααααααΆαα’αααα±αααααα‘αααααααααααααΌαααΆαααααΌααααααααΈ Git α αΎαααααΆα’αααΈ α’αααααααΌαααΆαααααααααΆαα·ααΈαααα’αααααααΌαααΆα)
ααΆααααααΎαααΆααα·α αα
ααΆαα’αα»αααα DAG ααΊααΆααΌα Python ααΌα ααααααΎαααΆααα·ααΈααΆααααα½ααααα»ααααα»αααΆαααΆαααααααα ααα½αααΌααα αααααααΎααΆα α§ααΆα αααααΆαα½αααααααααααΆααααααα α§αααΆααΆα’αααααΆα MySQL shards α ααα½αααΈααΆααααα α’αααααααΌαα‘αΎαα αΌααα αααα»αααΈαα½αα α αΎααααα·αααααααα½αα ααα½αα ααΎαααΈααααα ααααααα―αααΆααααα·αααααααααΆα ααΌα Python αα αααα»α DAG α’αΆα ααΎααα ααΌα αααα
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG ααΎααα ααΌα αααα
αααα»αααααΈααα α’αααα’αΆα αααααα α¬αα»α shard αααααααΆαααααααααααΌαααΆαααααα αα·αααααΎαα αα α»ααααααααΆα DAG α ααΆαβααΆβαα»αααΆα!
α’αααααα’αΆα ααααΎααΆααααααΎαααΌααααα»αααααααΆαααΆααααααααα α§ααΆα ααα ααααΎααΆαααΆαα½αααααααααα»αααααααααααΌαααααΆααα·αααααα α¬αα·αααααΆα’αααΈαα ααΆααααααααααΆααΆα αααα½ααααααααΆααααααΆααααααΎααΆαααΆαα½αααΆααΆα αα·αααααα·αααΌαααΈαααααααα·αααααα αααααΆαα ααΆαααααααα DWH αααααΎαααααΎαααΆααα½αα αααααΆαααααα»αααΆααΆα N αα αααα»ααααααααααα»αααααα’αααα α¬α§ααΆα ααα ααΆαααααΎααΆαααΆαα½α API ααααα·αααΆααααααΆαααααΎααΆαααΆαα½ααααΆαααΆαααααααααα»ααααααααααααααΈ α’αααα’αΆα αααααΎααα·α αα ααΆα N αααα»α DAG ααΈαααααΈααα αααααααΆαααααααααΆααααααΎαα αααα»α API αα α’αΆααα½α α αΎαααααα αα·ααααααα αΆαααΆα αααΈ API α αααααα!
ααααΆαα
Airflow ααΆαααααΆαα backend ααααΆαααααα½α ααΌαααααΆααα·αααααα (α’αΆα ααΆ MySQL α¬ Postgres ααΎαααΆα Postgres) ααααααα»αααΌαααααΆαααΆααα·α αα ααΆα DAGs ααΆααααααααΆααααααΆαα α’ααααααααα αα ααΈααα αααα»αα αααα·ααΆαααΆαααΆ ααααΆαααα αααα»α Airflow ααΊααΆααααααΆαα (αααα αα 20 αα») α αΎαααΆααααα½αααααα·αααΎα’αααα αααααααΎαααααΎαααΆαααααΆαααααα½αααααα’ααααα ααΎααΆα αααα»αα αΆαααΆααΆαα ααα½α 100500 αα αααα»αααααΆαα Informatica αααααααΌααα·ααααΆα’ααααααααααΆααΌααα»ααααααααααααααΈαααααααααΎααααα½αα
ααΆααααα½ααα·αα·ααα
ααααα·αααΈααΆαααΆααααααααααΆαα α’αααα’αΆα αααααΎαααααΎαααΆααααα½ααα·αα·ααααα·α αα ααΆααααααΆααααα½ααααααΆααα’αααα ααΎαααααΎ notepad αα αααα»α Zeppelin αααααΎαααΎαααααΆαααΆααααα·α αα ααΆαα
αααααα’αΆα ααΆα ααα»α αααααΆαααααααΆααααα Airflow αααα½αααΆαααααα
αααααΌα Airflow ααΊααΆαααααααΎαα αα ααΌα ααααααΎαααΆαααααααααΆαααΌαααααΉααα ααΆαα Telegram α ααΆααααααΈααααααα»αααααΎαααΆααααα·α αα ααΆααα½α ααααα·αααΎααΆαααα α»αααΎαα‘αΎα ααα spam αααα»ααα αααα»α Telegram ααααααα»αα’αα·αααααα αα·ααααα½αααΆααααΌαααΆαα
ααΎαααα½αααΆαααΆαααααΎαααααααΆααααΆαααα Telegram (ααααα·αααΎα αΆαααΆα α) α αΎαααΆαααα Zeppelin ααΎαααα½αααΆαααΌαααΆαααΆααααΌααααα·α αα ααΆααα αααα»α Airflow α
ααα»α
Airflow ααΊααΆαααααααΎαα αα ααΆα αααα α αΎαα’ααααα·ααα½αααααΉαααΆααΉαααΆαα’ααααΌαα ααα»ααΈααΆααα ααααααααα½αααΎααααΈααΆαααααααααΆ αα·αααΆααα·αααααααΉααααααααΎααααΈαααΆααααααααααΆααααααααΎαααΆα ααααα ααΊα’αΆα αααααα ααΆα ααΏαααα»α ααΆααΆααααααα ααααΏαααααΆαα’αα·ααααααααΆαααααααααΆαααΆααααα½αααααΆαααααααααααΎαααΆαααααΈ - α’αααααΉαα αΌαα α·αααααΆα ααΆααΆααα·αααΆααα’αααααααΌαααα α·ααααα»αααΆαααααΆαααααΆααα ααααααΆααααα ααααααααααααααΆαααααα αΌααααααααααααα½αα―α: α’ααααΌαα ααα»αα·αααΎαα‘αΎαααα
α₯α‘αΌααααααΎαααΆα Airflow ααααΎααΆαααΆαααααΆαααααα αααα αα 6,5 ααΆααααΆααα·α αα . αα½αααααΆαα αα·ααα»αααααΆααααΆααα ααΆαααΆααα·α αα αααα»ααα·αααααααα αααα»α DWH α ααααααΈαααααααααααααααΆ αα·αααΆααααΆαααααα»α ααΆαααΆααα·α αα ααααΆαα»ααααα·ααα ααΆααααα»α DWH ααααΆαα ααΆαααΆααα·α αα ααααα»ααααα·αααααααα αααα»α DWH ααΏα ααΆαααΆαααΆαα αααΎα α αααΎααααααααααΆ αα·αααα αΌαααααα ααααΆαααΆααααα½αααααα αα·ααΆαβααΆβαααβααΊβαααβα―α α₯α’ αααΊα ααΆααα·α αα ELT ααααΆααααα»αααααΆαααααααααααΆαα αααα»α DWH (Hadoop) αααα αΆαααααα ααα ααΌαααααΆααα·αααααα 2,5 αα ααααα, αααααΊααΆαααα»αααααΈ 4 α’αααα’αα·αααααα ETLαααααααΌαααΆααααα αααα ααΆααααΎαααΆααα·αααααα ETL αα αααα»α DWH αα·α ELT ααααΎαααΆααα·ααααααααΆααααα»α DWH αα·αααΆααΆααα·αααΆααααΆα αααΎαααα α’ααααααααααααααααΆααααααααααααΆαααΆαα½αα αααααΆαα ααΆααααααααααααααΆααααα
αααααΆααααααΆααα’ααΆαα
α ααα½αααααααΎαααΆααααα»αααΎαα‘αΎααααααααα·ααα½α α αΎαααΏαααααΆαααααααΎαααΉαααααΎααΆααααααΉαα αααααΆαα ααΆαααααααα Airflow ααΊααΆαααααΎααΆαααααααΆαα ααΎαα αααααααΎααααα»α Airflow αααα ααααΎααα½αααΌαααααΆαααααααα Celery αα·ααααααΎαααααΆααααα½αααααααα½αα―αααΆαα½αααΉαααααΎαααΆααααα αααΆααα·ααΆαααΆαααΆα αα·ααααααααααα»αα
Epilogue
ααΆααΆααα·αααΆαα ααααα·ααααααΆα’αααΈααααααααΆαααααααα»αα ααααααΆααα’αααΈ Airflow ααααα ααα»αααααααα»αααΆαααααΆααΆαααΌααααααΆααα ααα»α ααααΆαααα α ααααβα αααΌαβα α·αααβααβααΆαα½αβααΉαβααΆαβαααΆα ααΆαααααβααΆβα αΎαβα’αααβααΉαβα αΌαα α·αααβααΆ :)
ααααα: www.habr.com