á°áá ááĽá! á áá á˝áá ááľáĽ áľá áŁá˝ áłáł áááá áŞáŤ áá°áśá˝á áááłá á áľá á ááľ áĽáŠ ááŁáŞáŤ ááááá áĽááááá ᣠáááłá ᣠá áľáá áľ DWH ááá á áĽááľá DataLake áá á¨á° áááľ ááľáĽá˘ áľá Apache Airflow (á¨áá á áá á¨á á¨á áá°áľ ááŁáá) áĽáááááŤáá. á áá Ꮰáá ááľáá áŁááá ááአáľáŠá¨áľ á°á°áĽáśáłáᣠáĽá á ááá ááá á˘áŤááľ á¨á á¨á áá°áľ áá˘á˛á¤á/á¤áᲠáá°áśá˝ áááááĽá áááááĽá á˛ááᥠáá¨áľ á°áᢠáááá áááłáá áĽáááŤááá˘
á¨áá áá°á á á˛ááŽá áŁáá ááľáĽ áľá°áŤ áľá DWH áááľ á°á¨áłáłá á˝ááá˝á áťááŠ. á áá ᨠMail.Ru áĄáľá áĄáľá á áŁá áááá áĽá á á¨ááłá á áŤáŁá˘ á¨ááἠáľáá°á ááľá¨áá áĽáŤáááá ááᢠá áĽááṠᣠáá áĽá á áľá°áłá˝ áááľááá˝ áĽá¨áłáŠ ᣠáĽá áĽá áĄáľá áĽáá áľá áĽá á¨áá¨á áľáá°á ááľá¨á áĽáááááŤáá á˘
áá áľá
áľááá
, áĽáááá. á¨á á¨á áá°áľ áááľá áá? áá
á¤á°-áá˝áááľ áá (ááá
á áá á¨á á¨á áá°áľ áá áá á áŤááľá áĽáááá¨áľ ᢠá¨áĽááąá ááááľ áĽá á áá á áá¨áłáľ á¨áá°áľáá áľá-á áá á áĽáŠ áááł áá°áŤááľ áá˝ááᢠááááŁáľ ááá á áŤá áłááŹááľáľ á á˛ááá ááŤá áá (á¨áá á áá DAG á°áĽá áá áŤá)á˘
DAG
DAG á á°áá°á á¨áá á°ááł áá°á¨áľ á áĽáĽá á á°áááá á á°á á°á¨á°á áá ááá á¨ááááá¸á á°ááŁáŽá˝á á ááłááľ áľááá áŤáá áá á á ááᢠá¨á á¨á áá°áľ á¨DAGs áĽá á¨ááá˝ á áŤááľ áá áááľáŤáľ ááš á¨áľá á áááá˝á áá°áŁááĄ-
DAG áá á áááľá áá˝áááĄ-
ááá˘á DAG á˛ááľá á DAG ááľáĽ á¨áľááš á°ááŁáŤáľ áĽáá°áááᥠá¨áŚááŹá°áŽá˝á áľáĽáľáĽ áŤáľáááŁáᢠáĽáá áá° áá á áľááá á áŤá á°áá°áá: á¨á á¨á áá°áľ áŚááŹá°á.
á¨ááá˝
áŚááŹá°á á á¨áľáááš á¨áĽáŤ áááłáá˝ áá á¨á°áá°á¨á° á áŤá áá, áá
á á¨áĽáŤ áááł á ááááá áľ áá áá áĽáá°ááá ááááťá.
- BashOperator - ᨠbash áľááá áááľáá¸á áŚááŹá°á.
- PythonOperator - ᨠPython áŽáľ ááá°áá áŚááŹá°áá˘
- á˘áá áŚááŹá°á - á˘áá áááá áŚááŹá°á á˘
- HTTPOperator - ᨠhttp áĽáŤááá˝ áá áááľáŤáľ áŚááŹá°áá˘
- SqlOperator - ᨠSQL áŽáľá áááľáá¸á á¨áá.
- áłáłá˝ á ááľá ááľá°áľ ááá á á áŚááŹá°á áá (á¨áááááá áá áááŁáľ ᣠá¨ááááá ááá áá˝áł ᣠá áá¨á ááą ááľáĽ áŤá ááľáá ᣠá¨á¤áá á ááὠᣠááᰠᣠááá°.)á˘
á°á¨á᪠á¨á°áá°á áŚááŹá°áŽá˝ á áᥠDockerOperatorᣠHiveOperatorᣠS3FileTransferOperatorᣠPrestoToMysqlOperatorᣠSlackOperatorá˘
áĽáá˛áá á áŤáľá áŁá
áŞáŤáľ áá á°ááľáá°á áŚááŹá°áŽá˝á ááłá á áĽá á ááŽáááľá ááľáĽ áá áááŁá¸á áá˝áá. áááłáᣠá°ááśá˝á á¨MongoDB áá° áá á¨ááá áŚááŹá°á áĽá áĽá áŚááŹá°áŽá˝á á¨áááá˛á˘ áá° áá áááá MongoDBToHiveViaHdfsTransfer ááĽá¨ááá˘
á ááá á, áĽááá áá á¨á°ááŁáŽá˝ ááłááá˝ ááá¸á á ááŁá¸á, áĽá á áá áľá áááááĽá á ááŞá áĽáááááŤáá.
áááááĽá á ááá
á¨á á¨á áá°áľ á°ááŁá áááááĽá á¨á°áááŁá á áá ááá˘
áĽáŤááłááą áááł á áŚáłáá˝ áĽááľ áá áá°áĽ á áá. DAG á˛ááĽáŠ áááł áá°á áááĄ-
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
á DAG á°á¨á á¨á°áááá áááł á á°ááŁá á°á¨á áááá ἠáá˝ááá˘
á¨á°áᨠáá°áľáŁ áááááĽáᣠáááá á°ááŁáŤáľ á á á¨á áá°áľ ááľáĽ á¨áááááľ áááááľ á áá áľá˘ á áĽáááąáŁ áááááĽá á áľááá áĽáŤáá˝á á¨áááááľ áááá ááŤááŽá˝ áááá¨áłáᢠáĽáŤá á¨ááá¸á á ááľ á á ááŤáł á°á¨ááá˝ ááľáĽ áŤááá.
- á¨áá°ááľ á°ááŁáŤáľ á DAG ááľáĽ á°á ááááᤠá á˛áľ áá°áá áá˝ááá˘
- áá¨áá áĽáá° á°ááŁáŤáśá˝ á áľá፠áá°á¨á°áŤá (á áľááŤáá˝á áĽáá˛á ááĽáĽá áá°á¨ááŁá¸á áá˝áá) áĽá á áááłá ááľáĽ áá ááľáá˘áŤ áŤá á°ááŁáŠ áá° áĽáŤ ááᣠáá˝ááá˘
- áá á áŤá°á á´á᪠áŤá, á°ááŁáŠ áá° áĽáą áááŤá; á á˝áአááľáĽ ááŽááŤá áŤááĄáľ áĽáŤ á ááľ ááá áá áŚááŹá°á á áá áá ááááŤáá˘
ááá á áá˘
áááááĽá á°áŞá á ááá á¨DAGs áľáĽáľáĽ áĽá á DAGs ááľáĽ áŁá ááá á°ááŁáŤáľ áá áá°áŤáá˘
áááááĽá á¨DAG áá ááĽáŤáľ áĽáá˛ááá DAG áááááĽá áááááľ áááá áłááĄ-
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
ááá-á¨á°ááá á
áľá-á
ááŚá˝ áľáĽáľáĽ á á- @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
ááŽá á áááážá˝áá áá áá áá˝áááĄ-
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
á¨ááľáá¸á፠áá
á¨á á¨á áá°áľ áĽáá´áľ áĽáá°áá°áŤ ááá¨áłáľáŁ á¨ááľáá¸á፠áá áDAG áá áĽáá°áá áá¨áłáľ á áľááá ááᢠá á á¨á áá°áľ ááľáĽ, DAG á¨ááľáá¸á፠áá ááŹáľ á áá, áááľá, á DAG á¨áľáŤ ááá ááĽá áá á áááľá¨áľ, ááĽáŤááłááą á¨á áááá áá á¨á°ááŁá ááłááá˝ ááá áŤá. áĽá ááĽáŤááłááą á¨ááľáá¸á፠áá á°ááŁáŤáľ áĽáá°áá áá¨ááá áá˝áá - ááá áááłá ᣠDAG á á ááŤáł á¨ááľáá¸á፠áááľ ááľáĽ á á°ááłáłá áá áá°áŤ áá˝ááᢠáá á ááá˝ áĽáá ááłáŤááĄ-
áĽáá° á áááłá°á áá (ááá áĽáá° áĽáľá áá: áĽáá° áááłá ââááá°áá), á DAG ááľáĽ áŤáá á°ááŁá á áááá á¨á°áľá°áŤá¨á, á ááľáá á¨ááľáá¸á፠áá ááľáĽ ááľáá¸ááŤá ááľá°áŤá¨áŤáá˝á áááľ ááľáĽ á ááľááŁáľ áááĽáá. á á˛áľ á áááŞááá á áá áá áŤáááľá áááŤáľ ááἠáĽáá°áá ááľááľ á¨ááá áá áĽáŠ áá ᣠáá áá¤áą ááĽá áá ááááŤáąá á¨áá¤áą á°á°áááááľ á ááˇá (á áĽááἠááá á¨áááááá á¨ááá áŽáľ áľáŞáľ ᨠGit áĽáá˛áááą áĽá áá áááľááľ á áŤáľá¸ááááľáᢠá ááľ áá áŤáľáááááłá, á ááááá áľ ááááľ).
á°ááŁáŤáľá ááá á
á¨DAG á á°ááŁá á á áááá ááľáĽ áŽáľ áá, áľááá á áá°áŠá áľ áá á¨áŽáąá áá á áááááľ á áŁá ááš ááááľ á áá, áááłá, á á°ááŤá¨áĄ áááŽá˝. áĽáá° ááá áśáľáľ MySQL áťááśá˝ á áá áĽáá áᣠáá° áĽáŤááłááłá¸á áááŁáľ áĽá á¨á°áá°á ááἠááá°áľ á ááĽá ᢠá¨áá á á áá, á á°ááĽá áĽá á áľááŠ. á DAG ááľáĽ áŤáá á¨áááá áŽáľ áá á áááľá áá˝áááĄ-
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG áá áá áááľáá
á áá á ááŁá á ááĽáŽášá á ááá á ááľá°áŤá¨á áĽá DAG á á áááá á¸áá°á´ áá¨á ááá ááľáááľ áá˝ááᢠááš!
áĽáá˛áá á¨á áá ááľáĽáľáĽ á¨áŽáľ áááá¨áľá áá áá áá˝áá ᣠáááłá ᣠá¨áááŽá˝ áá á áá¨á ááľ ááá ááľáŤáľ ááá á¨á á¨á´á ááá áá ᣠá¨á á¨á´á áá áááľáŤáľ áľáá° ááá áĽá ᨠDWH áá á¨á° áááľá áŁá áŞáá˝ á¨áááľ ááľáĽ á ááľááŁáľ áá°áľá ááá á áá˝áá ᢠN á áá á¨áŚá˝á áá° áá¨ááťá áááŤáᢠááá áááłá á¨áááŞáŤ áá á áááá ááá ááľáŤáľá á¨ááá°áá á¤áá á áá á ááľáŤáľ á¨áá áááá ááľáĽ N á°ááŁáŤáľá á DAG áááá¨áľáŁ á á¤áá á ááľáĽ áŤá á¨áĽáŤááá˝ áľááŠááľ áá° áááł ááá°áĽ áĽá áá§á ἠáá˝ááᢠá áľááá ááἠá¨á¤áá á. á°áááá!
áá¨ááť
á¨á á¨á áá°áľ á¨áŤáą á¨áá áá¨áᝠᣠá¨ááἠááł ( MySQL ááá Postgres ááá áá˝áá ᣠáĽá ááľáľááŹáľ á áá) ᣠá¨á°ááŁá áááłáá˝á ᣠDAGs ᣠá¨áááááľ ááźáśá˝ ᣠááá á áá á°ááááŽá˝á ᣠááá° ... ááá° áŤá¨ááťá ᢠáĽáá áĽá áááľ áĽááááá á á á¨á áá°áľ ááľáĽ áŤáá áá¨ááť á áŁá ááá (áá° 20 á á¨á´ááá˝) áĽá áááááá á¨áŤáľáá áá°áśá˝ á áአáá áááááŁáľ á¨ááá ááš ááᢠáĽáŤáá áĽáá´áľ áĽáá°áááᥠá¨áá¨áłáľá á ááľ áá¨á á áá ááĽááľ á¨áá á¨áŁá¸á á Informatica áá¨ááť ááľáĽ áŤááľá 100500 á á¨á´ááá˝ á áľáłááłááá˘
ááľáľá
á¨áá áŤáááŤáá áááááľ á¨áááľ ááľáĽ á ááľááŁáľ ááĽááľá ááš á¨áá á¨á°ááŁá ááľáľá áá°áľ ááááŁáľ áá˝ááᢠá áááá ááľáĽ á¨ááľáłááť á°áĽá°á áĽáá ááááᣠá¨á°ááŁáŤáľá áááł á¨ááááá¨áľá áľáĄ-
áá á¨á á¨á áá°áľ áŤáą á¨áľá á áááá˝ áááá áá˝áááĄ-
á¨á á¨á áá°áľ áŽáľ áááľ ááá áá, áľááá áá´áááŤá ááá፠á¨áá¨áá. áĽáŤááłááą á¨áĽáŤ ááľáŹá ááłáᣠáľá á°áľ á¨á°áá á¨áŁ ááá á¨áááľ áĽá á¨áľáá áĄáľá áŁáŤá°á°á áľ á á´áááŤá ááľáĽ áĄáľáá á áááá áááááľ áŤá°áááá˘
ááŁá ááá˝ á á´áááŤá áĽááá ááá (á¨á°ááá) áĽá á áááá á áŠá á á á¨á áá°áľ ááľáĽ á¨á°ááŁá á á ááá ááľá áĽááá áááá˘
Ô¸ŐÔ´ŐÔąŐŐŐŐ ÔłÔťŐ
á¨á á¨á áá°áľ á ááááľ áááľ ááá ááᣠáĽá á¨áĽáą á°á ááŤáľá áá á á á¨ááĽááľáᢠá¨áá áŤáá áááľá áááááŁáľ áá áĽá áĽá¨áľ áááľá¨á ááá áááᢠáᥠáá°á¨áľá áľ á¨áá˝á áá, áĽááá, áá áŤáá áá. á¨áĽáľááľ ááĽáᾠᣠá°áááááᾠᣠá áłá˛áľ áá°áśá˝á á¨áá¨áá áááááľ - áááłáᢠáĽááἠáá, áááŽáááą á á°á¨áááľ áĽá áľáŠá¨áľ ááľá áľ á ááĽááľ, á¨á á¨á áá°áľ áĽáŤáą áá¨áááľ: á°á ááŤáľ á áá¨á°áąá.
á áá á¨á á¨á áá°áľ á á¨áá áĽá¨á°áŤá ááᢠáá° 6,5 áşá á¨áá á á°ááŁáŤáľ. á áŁá áŞáŤá¸á á áŁá á¨á°ááŤáŠ áá¸áᢠá¨á°ááŤáŠ áĽá á áŁá á¨á°áá°á áááŽá˝ áá° ááá DWH á¨ááŤá á°ááŁáŤáľ á áᣠá ááá DWH ááľáĽ á¨áąá ááľ á¨ááľááľ áľáŤáá˝ á áᣠáá¨áá áá° ááŁá DWH á¨áá°á áľáŤáá˝áŁ áĽá áĽá áĽá á¨á°ááŤáŠ áľáŤáá˝ á á - áĽá á¨á á¨á áá°áľ á¨áá áá° áá áááá áŤááłá¸ááᢠá ááĽá áááá áá ááᢠ2,3 áşá á DWH (Hadoop) ááľáĽ á¨á°ááŤá¨ ááľáĽáľáĽááľ áŤáá¸á ELT á°ááŁáŤáľáŁ á áááľá˘ 2,5 ááś á¨ááἠááłáá˝ áááŽá˝, áá áĄáľá ᨠáá 4 á¨á˘á˛á¤á ááá˘áá˝á DWH áĽá á DWH ááľáĽ ELT áłáł áááá á áĽá ááá˝á á ETL áá¨á áá°áľ á¨á°á¨ááá áá¸áᢠá ááľ á áľá°áłáłáŞ, á¨á áááááąá áá á¨á° áááľ á¨áááá¨áľ.
ááá°ááą áĽá áľ
á¨áá°áą áĽááľ áá°á á¨áááá á˛áá á¨á á¨á áá°áľ áá á¨á° áááľ áá á á°áŤáŤá á¨ááá°ááá ááá ááá ááľáááľ ááᢠá¨á á¨á áá°áľ áááľá°á ááááŁáľ áĽááááááᣠáá´á᪠á°áŤá°áá˝ áĽááľ áĽááŽá˝á áĽáááľáŁáá áĽá á áŤáľ á¨ááŁá ááá ááľ á¨áľáŤ ááá ááĽá áá°áśá˝ áĽá áá¨ááť ááá˘
Epilogue
áá á áĽááἠáľá á á¨á áá°áľ áááá á¨ááááá ááá ááá á áá°áá ááá áá áá áá ááĽáŚášá áááááľ áá¨ááŠ. á¨ááἠááááľ á¨ááĽááľ áá áááŁá ᣠááááŠáľ áĽá áááłá :)
ááá: hab.com