Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Kaabo, Habr! Ninu nkan yii Mo fẹ lati sọrọ nipa ohun elo nla kan fun idagbasoke awọn ilana ṣiṣe data ipele, fun apẹẹrẹ, ninu awọn amayederun ti DWH ile-iṣẹ tabi DataLake rẹ. A yoo sọrọ nipa Apache Airflow (lẹhinna tọka si Airflow). O jẹ aiṣedeede finnufindo akiyesi lori Habré, ati ni apakan akọkọ Emi yoo gbiyanju lati parowa fun ọ pe o kere ju Airflow tọ lati wo nigbati o yan oluṣeto fun awọn ilana ETL / ELT rẹ.

Ni iṣaaju, Mo ti kowe kan lẹsẹsẹ ti awọn nkan lori koko ti DWH nigbati mo sise ni Tinkoff Bank. Bayi Mo ti di apakan ti ẹgbẹ Ẹgbẹ Mail.Ru ati pe Mo n ṣe agbekalẹ pẹpẹ kan fun itupalẹ data ni agbegbe ere. Lootọ, bi awọn iroyin ati awọn solusan ti o nifẹ han, ẹgbẹ mi ati Emi yoo sọrọ nibi nipa pẹpẹ wa fun awọn itupalẹ data.

Àsọyé

Nitorinaa, jẹ ki a bẹrẹ. Kí ni Airflow? Eyi jẹ ile-ikawe kan (tabi ṣeto ti ikawe) lati se agbekale, gbero ati atẹle awọn ilana iṣẹ. Ẹya akọkọ ti Airflow: Python koodu ti lo lati ṣe apejuwe awọn ilana (dagbasoke). Eyi ni awọn anfani pupọ fun siseto iṣẹ akanṣe ati idagbasoke rẹ: ni pataki, iṣẹ akanṣe rẹ (fun apẹẹrẹ) ETL jẹ iṣẹ akanṣe Python kan, ati pe o le ṣeto bi o ṣe fẹ, ni akiyesi awọn pato ti awọn amayederun, iwọn ẹgbẹ ati miiran awọn ibeere. Instrumentally ohun gbogbo ni o rọrun. Lo fun apẹẹrẹ PyCharm + Git. O jẹ iyanu ati irọrun pupọ!

Bayi jẹ ki a wo awọn nkan akọkọ ti Airflow. Nipa agbọye idi pataki wọn ati idi, o le ṣeto eto faaji ilana rẹ ni aipe. Boya nkan akọkọ ni Aworan Acyclic Dari (lẹhinna tọka si DAG).

DAG

A DAG jẹ diẹ ninu awọn ẹgbẹ ti o nilari ti awọn iṣẹ ṣiṣe rẹ ti o fẹ lati pari ni ọna ti a ṣalaye ni muna ni ibamu si iṣeto kan pato. Airflow n pese wiwo oju opo wẹẹbu ti o rọrun fun ṣiṣẹ pẹlu DAGs ati awọn nkan miiran:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

DAG le dabi eyi:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Olùgbéejáde, nigbati o ba n ṣe apẹrẹ DAG, fi ipilẹ awọn oniṣẹ silẹ lori eyiti awọn iṣẹ-ṣiṣe laarin DAG yoo kọ. Nibi a wa si nkan pataki miiran: Oṣiṣẹ Airflow.

Awọn oniṣẹ

Oniṣẹ jẹ ẹya lori ipilẹ ti awọn iṣẹlẹ iṣẹ ti ṣẹda, eyiti o ṣe apejuwe ohun ti yoo ṣẹlẹ lakoko ipaniyan ti apẹẹrẹ iṣẹ kan. Awọn idasilẹ ṣiṣan afẹfẹ lati GitHub tẹlẹ ninu ṣeto awọn oniṣẹ ti o ṣetan lati lo. Awọn apẹẹrẹ:

  • BashOperator - oniṣẹ fun ṣiṣe pipaṣẹ bash kan.
  • PythonOperator - oniṣẹ fun pipe Python koodu.
  • ImeeliOperator - oniṣẹ fun fifiranṣẹ imeeli.
  • HTTPOperator – oniṣẹ fun ṣiṣẹ pẹlu http awọn ibeere.
  • SqlOperator - oniṣẹ fun ṣiṣe koodu SQL.
  • Sensọ jẹ oniṣẹ kan fun iduro fun iṣẹlẹ kan (dide ti akoko ti o nilo, irisi faili ti o nilo, laini kan ninu ibi ipamọ data, idahun lati API, ati bẹbẹ lọ, ati bẹbẹ lọ).

Awọn oniṣẹ kan pato diẹ sii wa: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

O tun le ṣe agbekalẹ awọn oniṣẹ ti o da lori awọn abuda tirẹ ati lo wọn ninu iṣẹ akanṣe rẹ. Fun apẹẹrẹ, a ṣẹda MongoDBToHiveViaHdfsTransfer, oniṣẹ ẹrọ fun gbigbe awọn iwe aṣẹ okeere lati MongoDB si Ile Agbon, ati ọpọlọpọ awọn oniṣẹ fun ṣiṣẹ pẹlu Tẹ Ile: CHLoadFromHiveOperator ati CHTableLoaderOperator. Ni pataki, ni kete ti iṣẹ akanṣe kan ti lo koodu nigbagbogbo ti a ṣe lori awọn alaye ipilẹ, o le ronu nipa kikọ rẹ sinu alaye tuntun kan. Eleyi yoo simplify siwaju idagbasoke, ati awọn ti o yoo faagun rẹ ìkàwé ti awọn oniṣẹ ninu ise agbese.

Nigbamii, gbogbo awọn iṣẹlẹ wọnyi ti awọn iṣẹ-ṣiṣe nilo lati ṣiṣẹ, ati ni bayi a yoo sọrọ nipa oluṣeto.

Eto iṣeto

Oluṣeto iṣẹ-ṣiṣe Airflow ti wa ni itumọ ti lori Seleri. Seleri jẹ ile-ikawe Python ti o fun ọ laaye lati ṣeto isinyi pẹlu asynchronous ati ipaniyan pinpin awọn iṣẹ ṣiṣe. Ni ẹgbẹ Airflow, gbogbo awọn iṣẹ-ṣiṣe ti pin si awọn adagun omi. Awọn adagun omi ni a ṣẹda pẹlu ọwọ. Ni deede, idi wọn ni lati ṣe idinwo iṣẹ ṣiṣe ti ṣiṣẹ pẹlu orisun tabi lati ṣapejuwe awọn iṣẹ ṣiṣe laarin DWH. Awọn adagun omi le ṣee ṣakoso nipasẹ wiwo wẹẹbu:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Kọọkan pool ni o ni a iye to lori awọn nọmba ti iho . Nigbati o ba ṣẹda DAG, o fun ni adagun kan:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Adagun adagun ti a ṣalaye ni ipele DAG le jẹ agbekọja ni ipele iṣẹ-ṣiṣe.
Ilana ti o yatọ, Alakoso, jẹ iduro fun siseto gbogbo awọn iṣẹ-ṣiṣe ni Airflow. Lootọ, Iṣeto n ṣowo pẹlu gbogbo awọn ẹrọ ṣiṣe eto awọn iṣẹ ṣiṣe fun ipaniyan. Iṣẹ naa lọ nipasẹ awọn ipele pupọ ṣaaju ṣiṣe:

  1. Awọn iṣẹ ṣiṣe ti tẹlẹ ti pari ni DAG; tuntun le ti wa ni isinyi.
  2. Awọn ti isinyi ti wa ni lẹsẹsẹ da lori ayo awọn iṣẹ-ṣiṣe (awọn ayo tun le dari), ati ti o ba ti wa ni free Iho ni pool, awọn iṣẹ-ṣiṣe le wa ni ya sinu isẹ.
  3. Ti seleri oṣiṣẹ ọfẹ kan wa, iṣẹ naa ni a firanṣẹ si rẹ; Iṣẹ ti o ṣeto ninu iṣoro naa bẹrẹ, ni lilo ọkan tabi oniṣẹ ẹrọ miiran.

Rọrun to.

Iṣeto nṣiṣẹ lori ṣeto gbogbo awọn DAGs ati gbogbo awọn iṣẹ-ṣiṣe laarin awọn DAG.

Fun Alakoso lati bẹrẹ ṣiṣẹ pẹlu DAG, DAG nilo lati ṣeto iṣeto kan:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Eto ti awọn tito-ṣeto wa: @once, @hourly, @daily, @weekly, @monthly, @yearly.

O tun le lo awọn ikosile cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ọjọ ipaniyan

Lati ni oye bi Airflow ṣe n ṣiṣẹ, o ṣe pataki lati ni oye kini Ọjọ Ipaniyan jẹ fun DAG kan. Ni Airflow, DAG ni iwọn Ọjọ Ipaniyan, ie, da lori iṣeto iṣẹ DAG, awọn iṣẹlẹ iṣẹ-ṣiṣe ni a ṣẹda fun Ọjọ Ipese kọọkan. Ati fun Ọjọ Ipaniyan kọọkan, awọn iṣẹ-ṣiṣe le tun-ṣe - tabi, fun apẹẹrẹ, DAG le ṣiṣẹ ni igbakanna ni ọpọlọpọ Awọn Ọjọ Ipaniyan. Eyi han kedere nibi:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Laanu (tabi boya daadaa: o da lori ipo naa), ti imuse ti iṣẹ-ṣiṣe ni DAG jẹ atunṣe, lẹhinna ipaniyan ni Ọjọ Ipaniyan ti tẹlẹ yoo tẹsiwaju ni akiyesi awọn atunṣe. Eyi dara ti o ba nilo lati ṣe atunto data ni awọn akoko ti o kọja nipa lilo algoridimu tuntun, ṣugbọn o buru nitori atunṣe ti abajade ti sọnu (dajudaju, ko si ẹnikan ti o yọ ọ lẹnu lati pada ẹya ti o nilo ti koodu orisun lati Git ati ṣe iṣiro kini kini o nilo akoko kan, ọna ti o nilo rẹ).

Ti o npese awọn iṣẹ-ṣiṣe

Awọn imuse ti DAG jẹ koodu ni Python, nitorina a ni ọna ti o rọrun pupọ lati dinku iye koodu nigba ti o ṣiṣẹ, fun apẹẹrẹ, pẹlu awọn orisun ti a ti pin. Jẹ ki a sọ pe o ni awọn shards MySQL mẹta bi orisun, o nilo lati gun sinu ọkọọkan ki o gbe diẹ ninu awọn data. Pẹlupẹlu, ni ominira ati ni afiwe. Koodu Python ni DAG le dabi eyi:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG dabi eyi:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Ni idi eyi, o le ṣafikun tabi yọ shard kan kuro nipa ṣiṣatunṣe awọn eto nikan ati mimudojuiwọn DAG. Itura!

O tun le lo iran koodu eka diẹ sii, fun apẹẹrẹ, ṣiṣẹ pẹlu awọn orisun ni irisi data data tabi ṣapejuwe eto tabili kan, algorithm kan fun ṣiṣẹ pẹlu tabili kan, ati, ni akiyesi awọn ẹya ti awọn amayederun DWH, ṣe agbekalẹ ilana kan. fun ikojọpọ N tabili sinu rẹ ipamọ. Tabi, fun apẹẹrẹ, ṣiṣẹ pẹlu API ti ko ṣe atilẹyin ṣiṣẹ pẹlu paramita ni irisi atokọ kan, o le ṣe ina awọn iṣẹ-ṣiṣe N ni DAG lati inu atokọ yii, ṣe idinwo isọdọkan ti awọn ibeere ni API si adagun-odo kan, ki o si parẹ. data pataki lati API. Rọ!

ibi ipamọ

Airflow ni ibi ipamọ ẹhin ti ara rẹ, ibi ipamọ data (le jẹ MySQL tabi Postgres, a ni Postgres), eyiti o tọju awọn ipinlẹ ti awọn iṣẹ-ṣiṣe, DAGs, awọn eto asopọ, awọn oniyipada agbaye, bbl, bbl Nibi Emi yoo fẹ Mo le sọ pe awọn ibi ipamọ ni Airflow jẹ rọrun pupọ (nipa awọn tabili 20) ati irọrun ti o ba fẹ kọ eyikeyi awọn ilana tirẹ lori oke rẹ. Mo ranti awọn tabili 100500 ni ibi ipamọ Informatica, eyiti o ni lati ṣe iwadi fun igba pipẹ ṣaaju oye bi o ṣe le kọ ibeere kan.

Abojuto

Fi fun ayedero ti ibi ipamọ, o le kọ ilana ibojuwo iṣẹ ti o rọrun fun ọ. A lo iwe akiyesi ni Zeppelin, nibiti a ti wo ipo awọn iṣẹ-ṣiṣe:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Eyi tun le jẹ wiwo wẹẹbu ti Airflow funrararẹ:

Ṣiṣan afẹfẹ jẹ ohun elo fun irọrun ati idagbasoke ni iyara ati mimu awọn ilana ṣiṣe data ipele ipele

Koodu Airflow jẹ orisun ṣiṣi, nitorinaa a ti ṣafikun titaniji si Telegram. Apeere nṣiṣẹ kọọkan ti iṣẹ-ṣiṣe kan, ti aṣiṣe ba waye, spams ẹgbẹ ni Telegram, nibiti gbogbo idagbasoke ati ẹgbẹ atilẹyin jẹ.

A gba esi kiakia nipasẹ Telegram (ti o ba nilo), ati nipasẹ Zeppelin a gba aworan gbogbogbo ti awọn iṣẹ-ṣiṣe ni Airflow.

Lapapọ

Ṣiṣan afẹfẹ jẹ orisun ṣiṣi akọkọ, ati pe o ko yẹ ki o reti awọn iṣẹ iyanu lati ọdọ rẹ. Ṣetan lati fi akoko ati igbiyanju lati kọ ojutu kan ti o ṣiṣẹ. Ibi-afẹde naa ṣee ṣe, gbagbọ mi, o tọ si. Iyara ti idagbasoke, irọrun, irọrun ti ṣafikun awọn ilana tuntun - iwọ yoo fẹran rẹ. Dajudaju, o nilo lati san ifojusi pupọ si iṣeto ti agbese na, iduroṣinṣin ti Airflow funrararẹ: awọn iṣẹ iyanu ko ṣẹlẹ.

Bayi a ni Airflow ṣiṣẹ lojoojumọ nipa 6,5 ​​ẹgbẹrun awọn iṣẹ-ṣiṣe. Wọn yatọ pupọ ni ihuwasi. Awọn iṣẹ ṣiṣe ti ikojọpọ data sinu DWH akọkọ lati ọpọlọpọ awọn oriṣiriṣi ati awọn orisun kan pato, awọn iṣẹ ṣiṣe ti iṣiro awọn ile itaja wa ninu DWH akọkọ, awọn iṣẹ ṣiṣe ti titẹ data sinu DWH iyara, ọpọlọpọ, ọpọlọpọ awọn iṣẹ ṣiṣe ti o yatọ - ati Airflow máa ń jẹ gbogbo wọn lójoojúmọ́. Ti sọrọ ni awọn nọmba, eyi ni 2,3 ẹgbẹrun Awọn iṣẹ-ṣiṣe ELT ti iyatọ iyatọ laarin DWH (Hadoop), isunmọ. 2,5 ọgọrun infomesonu awọn orisun, yi ni a egbe lati 4 ETL kóòdù, eyi ti o pin si ETL data processing ni DWH ati ELT data processing inu DWH ati ti awọn dajudaju siwaju sii ọkan admin, ti o ṣe pẹlu awọn amayederun ti iṣẹ naa.

Eto fun ojo iwaju

Nọmba awọn ilana jẹ eyiti o dagba, ati pe ohun akọkọ ti a yoo ṣe ni awọn ofin ti awọn amayederun Airflow jẹ iwọn. A fẹ lati kọ iṣupọ Airflow kan, pin awọn ẹsẹ meji fun awọn oṣiṣẹ Seleri, ati ṣe ori ẹda-ẹda kan pẹlu awọn ilana ṣiṣe eto iṣẹ ati ibi ipamọ kan.

Imudaniloju

Eyi, dajudaju, kii ṣe ohun gbogbo ti Emi yoo fẹ sọ nipa Airflow, ṣugbọn Mo gbiyanju lati ṣe afihan awọn aaye akọkọ. Idunnu wa pẹlu jijẹ, gbiyanju ati pe iwọ yoo fẹran rẹ :)

orisun: www.habr.com

Fi ọrọìwòye kun