He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

Aloha, Habr! Ma kēia ʻatikala makemake wau e kamaʻilio e pili ana i kahi mea hana maikaʻi loa no ka hoʻomohala ʻana i nā kaʻina hana ʻikepili batch, no ka laʻana, i ka ʻōnaehana o kahi hui DWH a i ʻole kāu DataLake. E kamaʻilio mākou e pili ana iā Apache Airflow (ma hope i kapa ʻia ʻo Airflow). Ua hoʻonele pono ʻole ia i ka nānā ʻana iā Habré, a ma ka ʻāpana nui e hoʻāʻo wau e hōʻoiaʻiʻo iā ʻoe ma ka liʻiliʻi o ka Airflow pono ke nānā aku i ke koho ʻana i kahi mea hoʻonohonoho no kāu kaʻina hana ETL/ELT.

Ma mua, ua kākau au i nā ʻatikala e pili ana i ke kumuhana o DWH i koʻu hana ʻana ma Tinkoff Bank. I kēia manawa ua lilo wau i ʻāpana o ka hui Mail.Ru Group a ke hoʻomohala nei au i kahi kahua no ka ʻikepili ʻikepili ma ka wahi pāʻani. ʻOiaʻiʻo, ke ʻike ʻia nā nūhou a me nā hopena hoihoi, e kamaʻilio wau me kaʻu hui ma ʻaneʻi e pili ana i kā mākou kahua no ka ʻikepili ʻikepili.

Prologue

No laila, e hoʻomaka kākou. He aha ka Airflow? He hale waihona puke kēia (a i ʻole pūʻulu hale waihona puke) e hoʻomohala, hoʻolālā a nānā i nā kaʻina hana. ʻO ka hiʻohiʻona nui o Airflow: Hoʻohana ʻia ka code Python e wehewehe (hoʻomohala) i nā kaʻina hana. He nui nā pōmaikaʻi o kēia no ka hoʻonohonoho ʻana i kāu papahana a me ka hoʻomohala ʻana: ma ke ʻano, ʻo kāu (no ka laʻana) ʻo ka papahana ETL he papahana Python wale nō, a hiki iā ʻoe ke hoʻonohonoho iā ia e like me kou makemake, e noʻonoʻo ana i nā kikoʻī o ka ʻōnaehana, ka nui o ka hui a nā koi ʻē aʻe. He mea maʻalahi nā mea a pau. E hoʻohana no ka laʻana PyCharm + Git. He nani a kūpono loa!

I kēia manawa, e nānā kākou i nā mea nui o Airflow. Ma ka hoʻomaopopo ʻana i ko lākou ʻano a me ke kumu, hiki iā ʻoe ke hoʻonohonoho maikaʻi i kāu hoʻolālā hana. ʻO ka Directed Acyclic Graph paha ka mea nui (i kapa ʻia ma hope aku ʻo DAG).

DAG

ʻO DAG kekahi hui koʻikoʻi o kāu mau hana āu e makemake ai e hoʻopau i kahi kaʻina i wehewehe ʻia e like me ka papa kuhikuhi kikoʻī. Hāʻawi ʻo Airflow i kahi kikowaena pūnaewele kūpono no ka hana ʻana me DAG a me nā mea ʻē aʻe:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

Penei paha ka DAG:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

ʻO ka mea hoʻomohala, i ka hoʻolālā ʻana i kahi DAG, waiho i kahi hoʻonohonoho o nā mea hana e kūkulu ʻia ai nā hana i loko o ka DAG. Eia mākou i hele mai i kahi mea nui ʻē aʻe: Airflow Operator.

ʻO nā mea hana

ʻO ka mea hoʻohana he hui ma ke kumu o ka hana ʻana o nā hana hana, e wehewehe ana i nā mea e hiki mai ana i ka wā o ka hoʻokō ʻana i kahi hana hana. Hoʻokuʻu ʻia ka ea mai GitHub ua loaʻa i kahi pūʻulu o nā mea hana mākaukau e hoʻohana. Nā laʻana:

  • BashOperator - mea hoʻohana no ka hoʻokō ʻana i kahi kauoha bash.
  • PythonOperator - mea hoʻohana no ke kāhea ʻana i ka code Python.
  • EmailOperator — mea hoʻohana no ka hoʻouna leka uila.
  • HTTPOperator - mea hoʻohana no ka hana ʻana me nā noi http.
  • SqlOperator - mea hoʻohana no ka hoʻokō ʻana i ka code SQL.
  • He mea hoʻohana ʻo Sensor no ke kali ʻana i kahi hanana (ka hōʻea ʻana o ka manawa i koi ʻia, ke ʻano o ka faila i makemake ʻia, kahi laina i ka waihona, kahi pane mai ka API, etc., etc.).

Aia nā mea hoʻohana kikoʻī hou aku: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Hiki iā ʻoe ke hoʻomohala i nā mea hana e pili ana i kāu mau hiʻohiʻona a hoʻohana iā lākou i kāu papahana. No ka laʻana, ua hana mākou iā MongoDBToHiveViaHdfsTransfer, he mea hoʻohana no ka lawe ʻana i nā palapala mai MongoDB a i Hive, a me kekahi mau mea hana no ka hana pū ʻana me KaomiHouse: CHLoadFromHiveOperator a me CHTableLoaderOperator. ʻO ka mea nui, i ka wā i hoʻohana pinepine ai kahi papahana i ke code i kūkulu ʻia ma nā ʻōlelo kumu, hiki iā ʻoe ke noʻonoʻo e pili ana i ke kūkulu ʻana i kahi ʻōlelo hou. E maʻalahi kēia i ka hoʻomohala hou ʻana, a e hoʻonui ʻoe i kāu waihona o nā mea hana i ka papahana.

A laila, pono e hoʻokō ʻia kēia mau hana āpau, a i kēia manawa e kamaʻilio mākou e pili ana i ka mea hoʻonohonoho.

Mea hoʻonohonoho

Kūkulu ʻia ka mea hoʻonohonoho hana a Airflow Celery. ʻO Celery kahi waihona Python e hiki ai iā ʻoe ke hoʻonohonoho i kahi queue me ka hoʻokō asynchronous a hoʻohele ʻia i nā hana. Ma ka ʻaoʻao Airflow, hoʻokaʻawale ʻia nā hana āpau i loko o nā loko. Hana lima ʻia nā loko. ʻO ka mea maʻamau, ʻo kā lākou kumu e kaupalena i ka hana o ka hana ʻana me ke kumu a i ʻole e hōʻike i nā hana i loko o ka DWH. Hiki ke mālama ʻia nā loko ma o ka ʻaoʻao pūnaewele:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

He palena ko kēlā me kēia loko i ka helu o nā slots. I ka hana ʻana i kahi DAG, hāʻawi ʻia i kahi wai:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Hiki ke hoʻopau ʻia kahi loko i wehewehe ʻia ma ka pae DAG ma ka pae hana.
ʻO kahi kaʻina kaʻawale, Scheduler, ke kuleana no ka hoʻonohonoho ʻana i nā hana āpau ma Airflow. ʻOiaʻiʻo, pili ʻo Scheduler i nā mīkini āpau o ka hoʻonohonoho ʻana i nā hana no ka hoʻokō. Ke hele nei ka hana ma nā ʻāpana he nui ma mua o ka hoʻokō ʻia:

  1. Ua hoʻopau ʻia nā hana ma mua i ka DAG; hiki ke hoʻopaʻa ʻia kahi mea hou.
  2. Hoʻokaʻawale ʻia ka pila ma muli o ka mea nui o nā hana (hiki ke hoʻomalu ʻia nā mea nui), a inā loaʻa kahi slot manuahi i loko o ka loko wai, hiki ke lawe ʻia ka hana.
  3. Inā loaʻa kahi celery limahana manuahi, hoʻouna ʻia ka hana iā ia; hoʻomaka ka hana āu i hoʻolālā ai i ka pilikia, me ka hoʻohana ʻana i kekahi mea hoʻohana.

Maʻalahi lawa.

Holo ka mea hoʻonohonoho ma ka hoʻonohonoho o nā DAG āpau a me nā hana āpau i loko o DAG.

No ka hoʻomaka ʻana o ka Scheduler e hana pū me DAG, pono ka DAG e hoʻonohonoho i kahi papahana:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Aia kahi hoʻonohonoho o nā preset i hoʻomākaukau ʻia: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Hiki iā ʻoe ke hoʻohana i nā ʻōlelo cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Lā Hoʻokō

No ka hoʻomaopopo ʻana i ka hana ʻana o Airflow, pono e hoʻomaopopo i ka lā hoʻokō no kahi DAG. Ma ka Airflow, loaʻa iā DAG kahi ana o ka lā hoʻokō, ʻo ia hoʻi, ma muli o ka papa hana hana a DAG, hana ʻia nā hana hana no kēlā me kēia lā hoʻokō. A no kēlā me kēia lā hoʻokō, hiki ke hoʻokō hou ʻia nā hana - a i ʻole, no ka laʻana, hiki i kahi DAG ke hana like i nā lā hoʻokō. Hōʻike maopopo ʻia kēia ma ʻaneʻi:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

ʻO ka mea pōʻino (a ʻoi paha ka pōmaikaʻi: pili ia i ke kūlana), inā hoʻoponopono ʻia ka hoʻokō ʻana o ka hana ma ka DAG, a laila e hoʻomau ka hoʻokō ʻana i ka lā hoʻokō mua e noʻonoʻo i nā hoʻoponopono. Maikaʻi kēia inā pono ʻoe e helu hou i ka ʻikepili i nā wā i hala me ka hoʻohana ʻana i kahi algorithm hou, akā maikaʻi ʻole no ka mea ua nalowale ka reproducibility o ka hopena (ʻoiaʻiʻo, ʻaʻohe mea e hoʻopilikia iā ʻoe e hoʻihoʻi i ka mana o ke kumu kumu mai Git a helu i ka mea pono ʻoe i hoʻokahi manawa, ke ala āu e pono ai).

Hana i nā hana

ʻO ka hoʻokō ʻana o ka DAG he code ma Python, no laila he ala kūpono loa mākou e hōʻemi i ka nui o ke code i ka wā e hana ai, no ka laʻana, me nā kumu sharded. E ʻōlelo mākou he ʻekolu ʻāpana MySQL ʻoe i kumu, pono ʻoe e piʻi i kēlā me kēia a ʻohi i kekahi mau ʻikepili. Eia kekahi, kūʻokoʻa a me ka like. ʻO ke code Python ma ka DAG e like paha me kēia:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Penei ke ano o ka DAG:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

I kēia hihia, hiki iā ʻoe ke hoʻohui a wehe paha i kahi shard ma ka hoʻoponopono wale ʻana i nā hoʻonohonoho a me ka hoʻonui ʻana i ka DAG. ʻoluʻolu!

Hiki iā ʻoe ke hoʻohana i ka hanauna code paʻakikī, no ka laʻana, hana me nā kumu ma ke ʻano o kahi waihona a wehewehe paha i kahi ʻano papaʻaina, kahi algorithm no ka hana ʻana me kahi papaʻaina, a me ka noʻonoʻo ʻana i nā hiʻohiʻona o ka ʻōnaehana DWH, hana i kahi kaʻina. no ka hoʻouka ʻana i nā papa N i kāu waihona. A i ʻole, no ka laʻana, e hana pū me kahi API ʻaʻole kākoʻo i ka hana ʻana me kahi ʻāpana ma ke ʻano o kahi papa inoa, hiki iā ʻoe ke hana i nā hana N i kahi DAG mai kēia papa inoa, e kaupalena i ka like like o nā noi i loko o ka API i kahi wai, a ʻohi. ka ʻikepili pono mai ka API. Hiki ke loli!

waihona waihona

Loaʻa i ka Airflow kona waihona backend pono'ī, kahi waihona (hiki iā MySQL a Postgres paha, loaʻa iā mākou nā Postgres), kahi e mālama ai i nā mokuʻāina o nā hana, DAGs, nā hoʻonohonoho pili, nā mea hoʻololi honua, etc., etc. ʻO ka waihona ma Airflow he mea maʻalahi loa (e pili ana i 20 mau papa) a maʻalahi inā makemake ʻoe e kūkulu i kāu mau hana ponoʻī ma luna. Hoʻomanaʻo wau i nā papa 100500 i loko o ka waihona Informatica, pono e aʻo ʻia no ka manawa lōʻihi ma mua o ka hoʻomaopopo ʻana i ke kūkulu ʻana i kahi nīnau.

Ka mālama ʻana

Hāʻawi ʻia i ka maʻalahi o ka waihona, hiki iā ʻoe ke kūkulu i kahi kaʻina hana nānā i kūpono iā ʻoe. Hoʻohana mākou i kahi notepad ma Zeppelin, kahi e nānā ai mākou i ke kūlana o nā hana:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

ʻO kēia paha ke kikowaena pūnaewele o Airflow ponoʻī:

He mea hana ka Airflow no ka hoʻomohala maʻalahi a me ka mālama ʻana i nā kaʻina hana ʻikepili batch

He kumu wehe ka Airflow code, no laila ua hoʻohui mākou i ka makaʻala iā Telegram. ʻO kēlā me kēia hiʻohiʻona holo o kahi hana, inā loaʻa kahi hewa, spam i ka hui ma Telegram, kahi i loaʻa ai ka hui hoʻomohala holoʻokoʻa a me ke kākoʻo.

Loaʻa iā mākou kahi pane wikiwiki ma o Telegram (inā koi ʻia), a ma o Zeppelin e loaʻa iā mākou ke kiʻi holoʻokoʻa o nā hana ma Airflow.

Hōʻuluʻulu

ʻO ka Airflow kahi kumu wehe mua, a ʻaʻole pono ʻoe e manaʻo i nā hana mana mai ia mea. E mākaukau e hoʻokomo i ka manawa a me ka hoʻoikaika e kūkulu i kahi hoʻonā e hana. Hiki ke hoʻokō ʻia ka pahuhopu, manaʻoʻiʻo mai iaʻu, pono ia. Ka wikiwiki o ka hoʻomohala ʻana, ka maʻalahi, ka maʻalahi o ka hoʻohui ʻana i nā kaʻina hana hou - makemake ʻoe. ʻOiaʻiʻo, pono ʻoe e uku nui i ka hoʻonohonoho ʻana o ka papahana, ke kūpaʻa o ka Airflow ponoʻī: ʻaʻole hiki i nā hana mana.

I kēia manawa ua hana mākou i ka Airflow i kēlā me kēia lā ma kahi o 6,5 tausani mau hana. He okoa loa ko lakou ano. Aia nā hana o ka hoʻouka ʻana i ka ʻikepili i loko o ka DWH nui mai nā kumu like ʻole a kikoʻī loa, aia nā hana o ka helu ʻana i nā hale kūʻai i loko o ka DWH nui, aia nā hana o ka hoʻopuka ʻana i ka ʻikepili i kahi DWH wikiwiki, he nui nā hana like ʻole - a me ka Airflow nau ia lakou a pau i kela la i keia la. Ma ka helu ʻana, ʻo kēia 2,3 tausani Nā hana ELT o ka paʻakikī like ʻole i loko o DWH (Hadoop), ma kahi o. 2,5 haneri waihona nā kumu, he hui kēia mai 4 mea hoʻomohala ETL, i māhele ʻia i ka hoʻoili ʻikepili ETL ma DWH a me ka hoʻoili ʻikepili ELT i loko o DWH a ʻoi aku ka nui hoʻokahi admin, ka mea e pili ana i ka pono o ka lawelawe.

Nā papa no ka wā e hiki mai ana

Ke ulu nei ka nui o nā kaʻina hana, a ʻo ka mea nui a mākou e hana ai e pili ana i ka ʻōnaehana Airflow ke scaling. Makemake mākou e kūkulu i kahi pūʻulu Airflow, hoʻokaʻawale i ʻelua mau wāwae no nā limahana Celery, a hana i kahi poʻo kope kope me nā kaʻina hoʻonohonoho hana a me kahi waihona.

Epilogue

ʻO kēia, ʻoiaʻiʻo, ʻaʻole ia nā mea a pau aʻu e makemake ai e haʻi e pili ana i ka Airflow, akā ua hoʻāʻo wau e hōʻike i nā kumu nui. Hele mai ka ʻai me ka ʻai, e hoʻāʻo a makemake ʻoe :)

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka