ããã«ã¡ã¯ãããã«ïŒ ãã®èšäºã§ã¯ãäŒæ¥ã® DWH ã DataLake ã®ã€ã³ãã©ã¹ãã©ã¯ãã£ãªã©ã§ããã ããŒã¿åŠçããã»ã¹ãéçºããããã® XNUMX ã€ã®åªããããŒã«ã«ã€ããŠèª¬æããããšæããŸãã Apache AirflowïŒä»¥äžãAirflowïŒã«ã€ããŠã話ããŸãã Habré ã«ã€ããŠã¯äžåœã«æ³šç®ãéãŸã£ãŠããªããããæ¬æã§ã¯ãETL/ELT ããã»ã¹ã®ã¹ã±ãžã¥ãŒã©ãéžæããéã«ãå°ãªããšã Airflow ã«æ³šç®ãã䟡å€ãããããšã説åŸããŠãããããšæããŸãã
以åãç§ã¯ Tinkoff Bank ã§åããŠãããšãã«ãDWH ãããŒãã«ããäžé£ã®èšäºãæžããŸããã çŸåšãç§ã¯ Mail.Ru ã°ã«ãŒã ããŒã ã®äžå¡ãšãªããã²ãŒã åéã®ããŒã¿åæçšã®ãã©ãããã©ãŒã ãéçºããŠããŸãã å®éããã¥ãŒã¹ãèå³æ·±ããœãªã¥ãŒã·ã§ã³ãç»å Žãããã³ã«ãç§ã®ããŒã ãšç§ã¯ããã§ããŒã¿åæçšã®ãã©ãããã©ãŒã ã«ã€ããŠè©±ãã€ããã§ãã
ããããŒã°
ããã§ã¯ãå§ããŸãããã ãšã¢ãããŒãšã¯äœã§ãã? ããã¯å³æžé€šã§ãïŒãŸãã¯
次ã«ãAirflow ã®äž»èŠãªãšã³ãã£ãã£ãèŠãŠã¿ãŸãããã ãããã®æ¬è³ªãšç®çãç解ããããšã§ãããã»ã¹ ã¢ãŒããã¯ãã£ãæé©ã«ç·šæã§ããŸãã ããããäž»äœãšãªãã®ã¯æåéå·¡åã°ã©ãïŒä»¥äžãDAGïŒã§ãããã
DAG
DAG ã¯ãç¹å®ã®ã¹ã±ãžã¥ãŒã«ã«åŸã£ãŠå³å¯ã«å®çŸ©ãããé åºã§å®äºããå¿ èŠãããã¿ã¹ã¯ã®æå³ã®ããé¢é£ä»ãã§ãã Airflow ã¯ãDAG ããã®ä»ã®ãšã³ãã£ãã£ãæäœããããã®äŸ¿å©ãª Web ã€ã³ã¿ãŒãã§ã€ã¹ãæäŸããŸãã
DAG ã¯æ¬¡ã®ããã«ãªããŸãã
éçºè ã¯ãDAG ãèšèšãããšãã«ãDAG å ã®ã¿ã¹ã¯ãæ§ç¯ããäžé£ã®ãªãã¬ãŒã¿ãŒãèŠå®ããŸãã ããã§ããã XNUMX ã€ã®éèŠãªãšã³ãã£ãã£ã§ãã Airflow Operator ã«ã€ããŠèª¬æããŸãã
æŒç®å
ãªãã¬ãŒã¿ãŒã¯ããžã§ã ã€ã³ã¹ã¿ã³ã¹ã®äœæã«åºã¥ãããšã³ãã£ãã£ã§ããããžã§ã ã€ã³ã¹ã¿ã³ã¹ã®å®è¡äžã«äœãèµ·ããããèšè¿°ããŸãã
- BashOperator - bash ã³ãã³ããå®è¡ããããã®ãªãã¬ãŒã¿ãŒã
- PythonOperator - Python ã³ãŒããåŒã³åºãããã®æŒç®åã
- EmailOperator â é»åã¡ãŒã«ãéä¿¡ããããã®æŒç®åã
- HTTPOperator - http ãªã¯ãšã¹ããåŠçããããã®æŒç®åã
- SqlOperator - SQL ã³ãŒããå®è¡ããããã®æŒç®åã
- ã»ã³ãµãŒã¯ã€ãã³ãïŒå¿ èŠãªæå»ã®å°æ¥ãå¿ èŠãªãã¡ã€ã«ã®åºçŸãããŒã¿ããŒã¹ã®è¡ãAPIããã®å¿çãªã©ïŒãåŸ ã€ããã®ãªãã¬ãŒã¿ãŒã§ãã
ããå ·äœçãªãªãã¬ãŒã¿ãŒãšããŠã¯ãDockerOperatorãHiveOperatorãS3FileTransferOperatorãPrestoToMysqlOperatorãSlackOperator ããããŸãã
ç¬èªã®ç¹æ§ã«åºã¥ããŠãªãã¬ãŒã¿ãŒãéçºãããããžã§ã¯ãã§äœ¿çšããããšãã§ããŸãã ããšãã°ãMongoDB ãã Hive ã«ããã¥ã¡ã³ãããšã¯ã¹ããŒãããããã®ãªãã¬ãŒã¿ãŒã§ãã MongoDBToHiveViaHdfsTransfer ãšããããæäœããããã®ããã€ãã®ãªãã¬ãŒã¿ãŒãäœæããŸããã
次ã«ãã¿ã¹ã¯ã®ããããã¹ãŠã®ã€ã³ã¹ã¿ã³ã¹ãå®è¡ããå¿ èŠããããŸãã次ã«ãã¹ã±ãžã¥ãŒã©ã«ã€ããŠèª¬æããŸãã
ãã©ã
Airflow ã®ã¿ã¹ã¯ ã¹ã±ãžã¥ãŒã©ã¯ä»¥äžã«åºã¥ããŠæ§ç¯ãããŠããŸã
åããŒã«ã«ã¯ã¹ãããæ°ã«å¶éããããŸãã DAG ãäœæãããšããDAG ã«ã¯ããŒã«ãäžããããŸãã
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG ã¬ãã«ã§å®çŸ©ãããããŒã«ã¯ãã¿ã¹ã¯ ã¬ãã«ã§ãªãŒããŒã©ã€ãã§ããŸãã
å¥ã®ããã»ã¹ã§ããã¹ã±ãžã¥ãŒã©ã¯ãAirflow å
ã®ãã¹ãŠã®ã¿ã¹ã¯ã®ã¹ã±ãžã¥ãŒã«ãæ
åœããŸãã å®éãã¹ã±ãžã¥ãŒã©ã¯ãå®è¡ããã¿ã¹ã¯ãèšå®ãããã¹ãŠã®ä»çµã¿ãåŠçããŸãã ã¿ã¹ã¯ã¯å®è¡ãããåã«ããã€ãã®æ®µéãçµãŸãã
- DAG å ã®ä»¥åã®ã¿ã¹ã¯ã¯å®äºããŠãããããæ°ããã¿ã¹ã¯ããã¥ãŒã«å ¥ããããšãã§ããŸãã
- ãã¥ãŒã¯ã¿ã¹ã¯ã®åªå 床ã«å¿ããŠãœãŒãããïŒåªå 床ã®å¶åŸ¡ãå¯èœïŒãããŒã«ã«ç©ºãã¹ããããããã°ã¿ã¹ã¯ãå®è¡ã§ããŸãã
- 空ããŠããã¯ãŒã«ãŒ ã»ããªãããå Žåãã¿ã¹ã¯ã¯ããã«éä¿¡ãããŸãã åé¡ã§ããã°ã©ã ããäœæ¥ããXNUMX ã€ãŸãã¯å¥ã®æŒç®åã䜿çšããŠéå§ãããŸãã
ååã·ã³ãã«ã§ãã
ã¹ã±ãžã¥ãŒã©ã¯ããã¹ãŠã® DAG ã®ã»ãããš DAG å ã®ãã¹ãŠã®ã¿ã¹ã¯ã§å®è¡ãããŸãã
ã¹ã±ãžã¥ãŒã©ãŒã DAG ãšã®é£æºãéå§ããã«ã¯ãDAG ãã¹ã±ãžã¥ãŒã«ãèšå®ããå¿ èŠããããŸãã
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
æ¢è£œã®ããªã»ããã®ã»ããããããŸãã @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
cron åŒã䜿çšããããšãã§ããŸãã
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
å®è¡æ¥
Airflow ãã©ã®ããã«æ©èœããããç解ããã«ã¯ãDAG ã®å®è¡æ¥ãç解ããããšãéèŠã§ãã Airflow ã§ã¯ãDAG ã«ã¯å®è¡æ¥ãã£ã¡ã³ã·ã§ã³ããããŸããã€ãŸããDAG ã®äœæ¥ã¹ã±ãžã¥ãŒã«ã«å¿ããŠãã¿ã¹ã¯ ã€ã³ã¹ã¿ã³ã¹ãå®è¡æ¥ããšã«äœæãããŸãã ãŸããå®è¡æ¥ããšã«ã¿ã¹ã¯ãåå®è¡ã§ããŸãããŸããããšãã°ãDAG ã¯è€æ°ã®å®è¡æ¥ã§åæã«åäœããããšãã§ããŸãã ããã¯ããã§æ確ã«ç€ºãããŠããŸãã
æ®å¿µãªãã (ãããã¯å¹žããªããšã«ãç¶æ³ã«ãã£ãŠç°ãªããŸãã)ãDAG å ã®ã¿ã¹ã¯ã®å®è£ ãä¿®æ£ãããå Žåã調æŽãèæ ®ããŠåã®å®è¡æ¥ã§ã®å®è¡ãç¶è¡ãããŸãã ããã¯ãæ°ããã¢ã«ãŽãªãºã ã䜿çšããŠéå»ã®æéã®ããŒã¿ãåèšç®ããå¿ èŠãããå Žåã«ã¯é©ããŠããŸãããçµæã®åçŸæ§ã倱ããããã奜ãŸãããããŸãã (ãã¡ãããå¿ èŠãªããŒãžã§ã³ã®ãœãŒã¹ ã³ãŒãã Git ããè¿ããŠäœãèšç®ããããããããé¢åã«æã人ã¯ããŸãã)å¿ èŠãªã®ã¯äžåºŠã ããå¿ èŠãªæ¹æ³ã§ïŒã
ã¿ã¹ã¯ã®çæ
DAG ã®å®è£ 㯠Python ã®ã³ãŒãã§ãããããããšãã°ã·ã£ãŒãã£ã³ã°ããããœãŒã¹ã䜿çšããå Žåã«ã³ãŒãã®éãåæžããéåžžã«äŸ¿å©ãªæ¹æ³ããããŸãã ãœãŒã¹ãšã㊠XNUMX ã€ã® MySQL ã·ã£ãŒãããããšããŸããããããã®ã·ã£ãŒãã«ã¢ã¯ã»ã¹ããŠãããŒã¿ãååŸããå¿ èŠããããŸãã ãããç¬ç«ãã€äžŠè¡ããŠã DAG å ã® Python ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG ã¯æ¬¡ã®ããã«ãªããŸãã
ãã®å Žåãèšå®ã調æŽã㊠DAG ãæŽæ°ããã ãã§ãã·ã£ãŒããè¿œå ãŸãã¯åé€ã§ããŸãã å¿«é©ïŒ
ãŸããããè€éãªã³ãŒãçæã䜿çšããããšãã§ããŸããããšãã°ãããŒã¿ããŒã¹ã®åœ¢åŒã§ãœãŒã¹ãæäœããããããŒãã«æ§é ãããŒãã«ãæäœããããã®ã¢ã«ãŽãªãºã ãèšè¿°ããããDWH ã€ã³ãã©ã¹ãã©ã¯ãã£ã®æ©èœãèæ ®ããŠããã»ã¹ãçæãããããããšãã§ããŸãã N åã®ããŒãã«ãã¹ãã¬ãŒãžã«ããŒãããããã®ãã®ã§ãã ãŸãã¯ãããšãã°ããªã¹ã圢åŒã®ãã©ã¡ãŒã¿ãŒã®æäœããµããŒãããŠããªã API ãæäœããå Žåããã®ãªã¹ããã DAG ã« N åã®ã¿ã¹ã¯ãçæããAPI å ã®ãªã¯ãšã¹ãã®äžŠååŠçãããŒã«ã«å¶éããå¿ èŠãªããŒã¿ãAPIããååŸããŸãã ãã¬ãã·ãã«ïŒ
ãªããžããª
Airflow ã«ã¯ãã¿ã¹ã¯ãDAGãæ¥ç¶èšå®ãã°ããŒãã«å€æ°ãªã©ã®ç¶æ ãä¿åããç¬èªã®ããã¯ãšã³ã ãªããžããªãããŒã¿ããŒã¹ (MySQL ãŸã㯠Postgres ã䜿çšã§ããŸããPostgres ããããŸã) ããããŸããããã§èšããããšã¯ã Airflow ã®ãªããžããªã¯éåžžã«ã·ã³ãã« (çŽ 20 ããŒãã«) ã§ããã®äžã«ç¬èªã®ããã»ã¹ãæ§ç¯ãããå Žåã«äŸ¿å©ã§ãã Informatica ãªããžããªã«ã¯ 100500 ã®ããŒãã«ããããã¯ãšãªã®äœææ¹æ³ãç解ãããŸã§ã«é·ãæéããããŠç 究ããå¿ èŠããã£ãããšãèŠããŠããŸãã
ç£èŠ
ãªããžããªãã·ã³ãã«ã§ãããããèªåã«ãšã£ãŠäŸ¿å©ãªã¿ã¹ã¯ç£èŠããã»ã¹ãæ§ç¯ã§ããŸãã Zeppelin ã§ã¡ã¢åž³ã䜿çšããŠãã¿ã¹ã¯ã®ã¹ããŒã¿ã¹ã確èªããŸãã
ããã¯ãAirflow èªäœã® Web ã€ã³ã¿ãŒãã§ã€ã¹ã§ããå¯èœæ§ããããŸãã
Airflow ã³ãŒãã¯ãªãŒãã³ãœãŒã¹ã§ãããããTelegram ã«ã¢ã©ãŒããè¿œå ããŸããã ã¿ã¹ã¯ã®å®è¡äžã®åã€ã³ã¹ã¿ã³ã¹ã§ãšã©ãŒãçºçãããšãéçºããã³ãµããŒã ããŒã å šäœãæ§æããã Telegram ã®ã°ã«ãŒãã«ã¹ãã ãéä¿¡ãããŸãã
Telegram (å¿ èŠãªå Žå) ãéããŠè¿ éãªå¿çãåãåããZeppelin ãéã㊠Airflow ã®ã¿ã¹ã¯ã®å šäœåãåãåããŸãã
åèšã§
Airflow ã¯äž»ã«ãªãŒãã³ãœãŒã¹ã§ãããããã«å¥è·¡ãæåŸ ãã¹ãã§ã¯ãããŸããã æå¹ãªãœãªã¥ãŒã·ã§ã³ãæ§ç¯ããããã«æéãšåŽåãè²»ããæºåãããŠãã ããã ç®æšã¯éæå¯èœã§ããä¿¡ããŠãã ãããããã ãã®äŸ¡å€ããããŸãã éçºã®ã¹ããŒããæè»æ§ãæ°ããããã»ã¹ã®è¿œå ã®å®¹æã - ãã£ãšæ°ã«å ¥ã£ãŠããã ããã§ãããã ãã¡ããããããžã§ã¯ãã®æ§æããšã¢ãããŒèªäœã®å®å®æ§ã«ååãªæ³šæãæãå¿ èŠããããŸããå¥è·¡ã¯èµ·ãããŸããã
çŸåšãAirflow ãæ¯æ¥æ©èœããŠããŸã çŽ6,5åã¿ã¹ã¯ã 圌ãã¯æ§æ Œãå šãç°ãªããŸãã å€ãã®ç°ãªãéåžžã«ç¹æ®ãªãœãŒã¹ããã¡ã€ã³ DWH ã«ããŒã¿ãããŒãããã¿ã¹ã¯ããããã¡ã€ã³ DWH å ã®ã¹ãã¢ããã³ããèšç®ããã¿ã¹ã¯ããããé«é DWH ã«ããŒã¿ãå ¬éããã¿ã¹ã¯ããããéåžžã«å€ãã®ç°ãªãã¿ã¹ã¯ããããŸã - ãã㊠Airflowæ¯æ¥æ¯æ¥ãããããã¹ãŠåã¿ç ããŸãã æ°åã§èšããšããã§ã 2,3å DWH (Hadoop) å ã®ããŸããŸãªè€éãã® ELT ã¿ã¹ã¯ãçŽ 2,5 ã®ããŒã¿ããŒã¹ æ å ±çãããã¯ããã®ããŒã ã§ã 4 人㮠ETL éçºè ãDWH ã§ã® ETL ããŒã¿åŠçãš DWH å ã§ã® ELT ããŒã¿åŠçããããŠãã¡ãããã以å€ã«ãåãããŠããŸãã XNUMX 人ã®ç®¡çè ããµãŒãã¹ã®ã€ã³ãã©ã¹ãã©ã¯ãã£ãæ åœããŸãã
å°æ¥ã®èšç»
ããã»ã¹ã®æ°ã¯å¿ ç¶çã«å¢å ããŠãããAirflow ã€ã³ãã©ã¹ãã©ã¯ãã£ã«é¢ããŠè¡ãäž»ãªäœæ¥ã¯ã¹ã±ãŒãªã³ã°ã§ãã Airflow ã¯ã©ã¹ã¿ãŒãæ§ç¯ããCelery ã¯ãŒã«ãŒã« XNUMX çµã®ã¬ãã°ãå²ãåœãŠããžã§ã ã¹ã±ãžã¥ãŒãªã³ã° ããã»ã¹ãšãªããžããªãåããèªå·±è€è£œããããäœæããããšèããŠããŸãã
ãã£ããŒã¬
ãã¡ãããããã Airflow ã«ã€ããŠäŒããããã¹ãŠã§ã¯ãããŸããããäž»èŠãªãã€ã³ãã匷調ããŠã¿ãŸããã é£ã¹ããšé£æ¬²ã湧ããŠããŸãããã²è©ŠããŠã¿ãŠãã ããããã£ãšæ°ã«å ¥ã£ãŠããã ããã§ããã :)
åºæïŒ habr.com