Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Sveiks, Habr! Å ajā rakstā es vēlos runāt par vienu lielisku rÄ«ku pakeÅ”u datu apstrādes procesu izstrādei, piemēram, korporatÄ«vā DWH vai jÅ«su DataLake infrastruktÅ«rā. Mēs runāsim par Apache Airflow (turpmāk tekstā Airflow). Tam ir negodÄ«gi atņemta HabrĆ© uzmanÄ«ba, un galvenajā daļā es mēģināŔu jÅ«s pārliecināt, ka vismaz Airflow ir vērts aplÅ«kot, izvēloties plānotāju saviem ETL/ELT procesiem.

IepriekÅ” es rakstÄ«ju rakstu sēriju par DWH tēmu, kad strādāju Tinkoff Bank. Tagad esmu kļuvis par daļu no Mail.Ru Group komandas un izstrādāju platformu datu analÄ«zei spēļu jomā. Faktiski, parādoties jaunumiem un interesantiem risinājumiem, es un mana komanda Å”eit runāsim par mÅ«su datu analÄ«zes platformu.

Prologs

Tātad, sāksim. Kas ir gaisa plÅ«sma? Å Ä« ir bibliotēka (vai bibliotēku komplekts) izstrādāt, plānot un uzraudzÄ«t darba procesus. Galvenā Airflow iezÄ«me: Python kods tiek izmantots, lai aprakstÄ«tu (attÄ«stÄ«tu) procesus. Tam ir daudz priekÅ”rocÄ«bu jÅ«su projekta un attÄ«stÄ«bas organizÄ“Å”anā: bÅ«tÄ«bā jÅ«su (piemēram) ETL projekts ir tikai Python projekts, un jÅ«s varat to organizēt kā vēlaties, ņemot vērā infrastruktÅ«ras specifiku, komandas lielumu un citas prasÄ«bas. Instrumentāli viss ir vienkārÅ”i. Izmantojiet, piemēram, PyCharm + Git. Tas ir brÄ«niŔķīgi un ļoti ērti!

Tagad apskatÄ«sim galvenās Airflow vienÄ«bas. Izprotot to bÅ«tÄ«bu un mērÄ·i, jÅ«s varat optimāli sakārtot savu procesu arhitektÅ«ru. Iespējams, galvenā vienÄ«ba ir virzÄ«tais acikliskais grafiks (turpmāk tekstā ā€“ DAG).

DAG

DAG ir jÅ«su uzdevumu semantiska saistÄ«ba, ko vēlaties izpildÄ«t stingri noteiktā secÄ«bā saskaņā ar noteiktu grafiku. Airflow nodroÅ”ina ērtu tÄ«mekļa saskarni darbam ar DAG un citām entÄ«tijām:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

DAG varētu izskatÄ«ties Ŕādi:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Izstrādātājs, izstrādājot DAG, nosaka operatoru kopumu, uz kuriem tiks balstīti uzdevumi DAG ietvaros. Šeit mēs nonākam pie citas svarīgas vienības: gaisa plūsmas operators.

Operatori

Operators ir entÄ«tija, uz kuras pamata tiek izveidotas darba instances, kas apraksta, kas notiks darba instances izpildes laikā. Gaisa plÅ«smas izlaidumi no GitHub jau satur lietoÅ”anai gatavu operatoru kopu. Piemēri:

  • BashOperator - operators bash komandas izpildei.
  • PythonOperator - operators Python koda izsaukÅ”anai.
  • EmailOperator ā€” operators e-pasta sÅ«tÄ«Å”anai.
  • HTTPOperator - operators darbam ar http pieprasÄ«jumiem.
  • SqlOperator - operators SQL koda izpildei.
  • Sensors ir operators notikuma gaidÄ«Å”anai (vajadzÄ«gā laika ieraÅ”anās, vajadzÄ«gā faila parādÄ«Å”anās, rinda datu bāzē, atbilde no API utt. utt.).

Ir specifiskāki operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Varat arÄ« izstrādāt operatorus, pamatojoties uz savām Ä«paŔībām, un izmantot tos savā projektā. Piemēram, mēs izveidojām MongoDBToHiveViaHdfsTransfer, operatoru dokumentu eksportÄ“Å”anai no MongoDB uz Hive, un vairākus operatorus darbam ar NoklikŔķiniet uz Māja: CHLoadFromHiveOperator un CHTableLoaderOperator. BÅ«tÄ«bā, tiklÄ«dz projektā bieži tiek izmantots kods, kas balstÄ«ts uz pamata paziņojumiem, varat domāt par tā izveidi jaunā paziņojumā. Tas vienkārÅ”os turpmāko attÄ«stÄ«bu, un jÅ«s paplaÅ”ināsit savu operatoru bibliotēku projektā.

Tālāk visi Å”ie uzdevumu gadÄ«jumi ir jāizpilda, un tagad mēs runāsim par plānotāju.

Plānotājs

Ir izveidots Airflow uzdevumu plānotājs Selerija. Selery ir Python bibliotēka, kas ļauj organizēt rindu, kā arī asinhronu un sadalītu uzdevumu izpildi. Gaisa plūsmas pusē visi uzdevumi ir sadalīti baseinos. Baseini tiek izveidoti manuāli. Parasti to mērķis ir ierobežot darba slodzi darbā ar avotu vai tipizēt uzdevumus DWH ietvaros. Baseinus var pārvaldīt, izmantojot tīmekļa saskarni:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Katram baseinam ir laika niŔu skaita ierobežojums. Izveidojot DAG, tam tiek pieŔķirts baseins:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG līmenī definētu pūlu var ignorēt uzdevuma līmenī.
AtseviŔķs process Plānotājs ir atbildīgs par visu Airflow uzdevumu plānoŔanu. Faktiski plānotājs nodarbojas ar visu izpildes uzdevumu iestatīŔanas mehāniku. Pirms izpildes uzdevums iziet vairākus posmus:

  1. IepriekŔējie uzdevumi DAG ir izpildÄ«ti, var likt jaunu.
  2. Rinda tiek sakārtota atkarībā no uzdevumu prioritātes (var arī kontrolēt prioritātes), un, ja baseinā ir brīva vieta, uzdevumu var nodot ekspluatācijā.
  3. Ja ir brīvs strādnieks selerija, uzdevums tiek nosūtīts tai; sākas darbs, ko ieprogrammējāt problēmā, izmantojot vienu vai otru operatoru.

Pietiekami vienkārŔi.

Plānotājs darbojas uz visu DAG kopu un visiem uzdevumiem DAG ietvaros.

Lai plānotājs varētu sākt strādāt ar DAG, DAG ir jāiestata grafiks:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ir gatavu iestatījumu komplekts: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Varat arī izmantot cron izteiksmes:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Izpildes datums

Lai saprastu, kā darbojas Airflow, ir svarÄ«gi saprast, kas ir DAG izpildes datums. Programmā Airflow DAG ir izpildes datuma dimensija, t.i., atkarÄ«bā no DAG darba grafika katram izpildes datumam tiek izveidoti uzdevumu gadÄ«jumi. Un katram izpildes datumam uzdevumus var izpildÄ«t atkārtoti - vai, piemēram, DAG var strādāt vienlaikus vairākos izpildes datumos. Tas ir skaidri parādÄ«ts Å”eit:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Diemžēl (vai varbÅ«t par laimi: tas ir atkarÄ«gs no situācijas), ja uzdevuma izpilde DAG tiks labota, tad izpilde iepriekŔējā Izpildes datumā turpināsies, ņemot vērā korekcijas. Tas ir labi, ja ir jāpārrēķina dati par pagājuÅ”ajiem periodiem, izmantojot jaunu algoritmu, bet tas ir slikti, jo tiek zaudēta rezultāta reproducējamÄ«ba (protams, neviens netraucē atgriezt nepiecieÅ”amo avota koda versiju no Git un aprēķināt, kas jums vajag vienu reizi, tā, kā jums tas ir nepiecieÅ”ams).

Uzdevumu ģenerēŔana

DAG ievieÅ”ana ir kods Python, tāpēc mums ir ļoti ērts veids, kā samazināt koda daudzumu, strādājot, piemēram, ar Ŕķeltajiem avotiem. Pieņemsim, ka jums ir trÄ«s MySQL shards kā avots, jums jāiekļaujas katrā un jāiegÅ«st daži dati. Turklāt neatkarÄ«gi un paralēli. Python kods DAG var izskatÄ«ties Ŕādi:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG izskatās Ŕādi:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Å ajā gadÄ«jumā jÅ«s varat pievienot vai noņemt Ŕķembu, vienkārÅ”i pielāgojot iestatÄ«jumus un atjauninot DAG. Ērti!

Varat arÄ« izmantot sarežģītāku koda Ä£enerÄ“Å”anu, piemēram, strādāt ar avotiem datu bāzes veidā vai aprakstÄ«t tabulas struktÅ«ru, algoritmu darbam ar tabulu un, ņemot vērā DWH infrastruktÅ«ras iezÄ«mes, Ä£enerēt procesu. lai ielādētu N tabulu savā krātuvē. Vai, piemēram, strādājot ar API, kas neatbalsta darbu ar parametru saraksta veidā, varat Ä£enerēt N uzdevumus DAG no Ŕī saraksta, ierobežot API pieprasÄ«jumu paralēlismu lÄ«dz pÅ«lam un nokasÄ«t nepiecieÅ”amie dati no API. ElastÄ«gi!

krātuve

Airflow ir savs aizmugures repozitorijs, datu bāze (var bÅ«t MySQL vai Postgres, mums ir Postgres), kurā tiek glabāti uzdevumu stāvokļi, DAG, savienojuma iestatÄ«jumi, globālie mainÄ«gie utt., utt. Å eit es gribētu teikt, ka Repozitorijs programmā Airflow ir ļoti vienkārÅ”s (apmēram 20 tabulas) un ērts, ja vēlaties tai papildināt kādu no saviem procesiem. Atceros 100500 XNUMX tabulas Informatica repozitorijā, kuras bija ilgi jāpēta, pirms sapratu, kā veidot vaicājumu.

Uzraudzība

Ņemot vērā repozitorija vienkārŔību, varat izveidot sev ērtu uzdevumu uzraudzÄ«bas procesu. Mēs Zeppelin izmantojam piezÄ«mju grāmatiņu, kurā aplÅ«kojam uzdevumu statusu:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Tā varētu bÅ«t arÄ« paÅ”as Airflow tÄ«mekļa saskarne:

Airflow ir rÄ«ks, lai ērti un ātri izstrādātu un uzturētu pakeÅ”datu apstrādes procesus

Airflow kods ir atvērtā koda, tāpēc mēs esam pievienojuÅ”i brÄ«dinājumus Telegram. Katrs palaist uzdevuma gadÄ«jums, ja rodas kļūda, izsÅ«ta surogātpastu Telegram grupai, kurā ir visa izstrādes un atbalsta komanda.

Mēs saņemam tÅ«lÄ«tēju atbildi, izmantojot Telegram (ja nepiecieÅ”ams), un caur Zeppelin mēs saņemam vispārēju priekÅ”statu par uzdevumiem Airflow.

Kopā

Gaisa plÅ«sma galvenokārt ir atvērtā koda avots, un no tā nevajadzētu gaidÄ«t brÄ«numus. Esiet gatavs veltÄ«t laiku un pÅ«les, lai izveidotu risinājumu, kas darbojas. MērÄ·is ir sasniedzams, ticiet man, tas ir tā vērts. AttÄ«stÄ«bas ātrums, elastÄ«ba, jaunu procesu pievienoÅ”anas vieglums - jums tas patiks. Protams, liela uzmanÄ«ba jāpievērÅ” projekta organizācijai, paÅ”as Airflow stabilitātei: brÄ«numi nenotiek.

Tagad Airflow strādā katru dienu apmēram 6,5 tÅ«kstoÅ”i uzdevumu. Tie ir diezgan atŔķirÄ«gi pēc rakstura. Ir uzdevumi ielādēt datus galvenajā DWH no daudziem dažādiem un ļoti specifiskiem avotiem, ir uzdevumi, lai aprēķinātu veikalu fasādes galvenajā DWH, ir uzdevumi publicēt datus ātrā DWH, ir daudz, daudz dažādu uzdevumu - un Airflow. koŔļā tos visus dienu no dienas. Runājot skaitļos, tas ir 2,3 tÅ«kstoÅ”i Dažādas sarežģītÄ«bas ELT uzdevumi DWH (Hadoop) ietvaros, apm. 2,5 simti datu bāzu avotiem, Ŕī ir komanda no 4 ETL izstrādātāji, kas ir sadalÄ«ti ETL datu apstrādē DWH un ELT datu apstrādē DWH iekÅ”ienē un, protams, vairāk viens admins, kas nodarbojas ar pakalpojuma infrastruktÅ«ru.

Plāni nākotnei

Procesu skaits neizbēgami pieaug, un galvenais, ko mēs darÄ«sim saistÄ«bā ar Airflow infrastruktÅ«ru, ir mērogoÅ”ana. Mēs vēlamies izveidot Airflow klasteru, pieŔķirt Selery strādniekiem kāju pāri un izveidot paÅ”dublējoÅ”u galvu ar darba plānoÅ”anas procesiem un repozitoriju.

Epilogs

Tas, protams, nav viss, ko es vēlētos pastāstīt par Airflow, bet es centos izcelt galvenos punktus. Apetīte nāk ēdot, pamēģini un tev patiks :)

Avots: www.habr.com

Pievieno komentāru