Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Përshëndetje, Habr! Në këtë artikull dua të flas për një mjet të shkëlqyeshëm për zhvillimin e proceseve të përpunimit të të dhënave në grup, për shembull, në infrastrukturën e një DWH të korporatës ose DataLake tuaj. Ne do të flasim për Apache Airflow (më tej referuar si Airflow). Është e privuar padrejtësisht nga vëmendja për Habré, dhe në pjesën kryesore do të përpiqem t'ju bind se të paktën Airflow ia vlen të shikohet kur zgjidhni një programues për proceset tuaja ETL/ELT.

Më parë, kam shkruar një seri artikujsh mbi temën e DWH kur punoja në Tinkoff Bank. Tani jam bërë pjesë e ekipit të Mail.Ru Group dhe po zhvilloj një platformë për analizën e të dhënave në fushën e lojërave. Në fakt, ndërsa shfaqen lajme dhe zgjidhje interesante, ekipi im dhe unë do të flasim këtu për platformën tonë për analitikën e të dhënave.

prolog

Pra, le të fillojmë. Çfarë është Airflow? Kjo është një bibliotekë (ose grup bibliotekash) për të zhvilluar, planifikuar dhe monitoruar proceset e punës. Tipari kryesor i Airflow: Kodi Python përdoret për të përshkruar (zhvilluar) proceset. Kjo ka shumë përparësi për organizimin e projektit dhe zhvillimit tuaj: në thelb, projekti juaj (për shembull) ETL është thjesht një projekt Python, dhe ju mund ta organizoni atë sipas dëshirës tuaj, duke marrë parasysh specifikat e infrastrukturës, madhësisë së ekipit dhe kërkesa të tjera. Instrumentalisht gjithçka është e thjeshtë. Përdorni për shembull PyCharm + Git. Është e mrekullueshme dhe shumë e përshtatshme!

Tani le të shohim entitetet kryesore të Airflow. Duke kuptuar thelbin dhe qëllimin e tyre, ju mund të organizoni në mënyrë optimale arkitekturën e procesit tuaj. Ndoshta entiteti kryesor është Grafiku Aciklik i Drejtuar (në tekstin e mëtejmë DAG).

Dag

Një DAG është një lidhje kuptimplotë e detyrave tuaja që dëshironi të përfundoni në një sekuencë të përcaktuar rreptësisht sipas një plani specifik. Airflow ofron një ndërfaqe të përshtatshme në internet për të punuar me DAG dhe entitete të tjera:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

DAG mund të duket kështu:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Zhvilluesi, kur dizajnon një DAG, përcakton një grup operatorësh mbi të cilët do të ndërtohen detyrat brenda DAG. Këtu kemi ardhur te një entitet tjetër i rëndësishëm: Operatori i Rrjedhës së Ajrit.

Операторы

Një operator është një ent mbi bazën e të cilit krijohen instancat e punës, i cili përshkruan se çfarë do të ndodhë gjatë ekzekutimit të një shembulli pune. Rrjedha e ajrit lëshon nga GitHub tashmë përmbajnë një grup operatorësh të gatshëm për t'u përdorur. Shembuj:

  • BashOperator - operator për ekzekutimin e një komande bash.
  • PythonOperator - operator për thirrjen e kodit Python.
  • EmailOperator - operator për dërgimin e emailit.
  • HTTPOoperator - operator për të punuar me kërkesat http.
  • SqlOperator - operator për ekzekutimin e kodit SQL.
  • Sensori është një operator për pritjen e një ngjarjeje (ardhja e kohës së kërkuar, shfaqja e skedarit të kërkuar, një rresht në bazën e të dhënave, një përgjigje nga API, etj., etj.).

Ka operatorë më specifikë: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Ju gjithashtu mund të zhvilloni operatorë bazuar në karakteristikat tuaja dhe t'i përdorni ato në projektin tuaj. Për shembull, ne krijuam MongoDBToHiveViaHdfsTransfer, një operator për eksportimin e dokumenteve nga MongoDB në Hive, dhe disa operatorë për të punuar me Shtëpi Kliko: CHLoadFromHiveOperator dhe CHTableLoaderOperator. Në thelb, sapo një projekt ka përdorur shpesh kodin e ndërtuar mbi deklaratat bazë, mund të mendoni për ndërtimin e tij në një deklaratë të re. Kjo do të thjeshtojë zhvillimin e mëtejshëm dhe ju do të zgjeroni bibliotekën tuaj të operatorëve në projekt.

Tjetra, të gjitha këto raste të detyrave duhet të ekzekutohen, dhe tani do të flasim për planifikuesin.

Programuesi

Programuesi i detyrave të rrjedhës së ajrit është ndërtuar mbi të Selino. Celery është një bibliotekë Python që ju lejon të organizoni një radhë plus ekzekutimin asinkron dhe të shpërndarë të detyrave. Nga ana e rrjedhës së ajrit, të gjitha detyrat ndahen në pishina. Pishinat krijohen me dorë. Në mënyrë tipike, qëllimi i tyre është të kufizojnë ngarkesën e punës me burimin ose të tipizojnë detyrat brenda DWH. Pishinat mund të menaxhohen përmes ndërfaqes së internetit:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Çdo pishinë ka një kufi në numrin e lojërave elektronike. Kur krijoni një DAG, atij i jepet një grup:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Një grup i përcaktuar në nivel DAG mund të anashkalohet në nivelin e detyrës.
Një proces i veçantë, Scheduler, është përgjegjës për planifikimin e të gjitha detyrave në Airflow. Në fakt, Scheduler merret me të gjithë mekanikën e vendosjes së detyrave për ekzekutim. Detyra kalon nëpër disa faza përpara se të ekzekutohet:

  1. Detyrat e mëparshme janë kryer në DAG, një e re mund të vendoset në radhë.
  2. Radha renditet në varësi të përparësisë së detyrave (përparësitë gjithashtu mund të kontrollohen), dhe nëse ka një vend të lirë në pishinë, detyra mund të vihet në funksion.
  3. Nëse ka një selino punëtore falas, detyra i dërgohet; fillon puna që keni programuar në problem, duke përdorur një ose një operator tjetër.

Mjaft e thjeshtë.

Scheduler funksionon në grupin e të gjitha DAG-ve dhe të gjitha detyrave brenda DAG-ve.

Që Scheduler të fillojë të punojë me DAG, DAG duhet të vendosë një orar:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ekziston një grup paravendosjesh të gatshme: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Ju gjithashtu mund të përdorni shprehjet cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data e ekzekutimit

Për të kuptuar se si funksionon Airflow, është e rëndësishme të kuptoni se cila është Data e Ekzekutimit për një DAG. Në Airflow, DAG ka një dimension Date Ekzekutimi, d.m.th., në varësi të orarit të punës së DAG, krijohen shembuj të detyrave për çdo datë ekzekutimi. Dhe për çdo datë ekzekutimi, detyrat mund të riekzekutohen - ose, për shembull, një DAG mund të funksionojë njëkohësisht në disa data të ekzekutimit. Kjo tregohet qartë këtu:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Fatkeqësisht (ose ndoshta për fat: varet nga situata), nëse zbatimi i detyrës në DAG korrigjohet, atëherë ekzekutimi në datën e mëparshme të ekzekutimit do të vazhdojë duke marrë parasysh rregullimet. Kjo është mirë nëse ju duhet të rillogaritni të dhënat në periudhat e kaluara duke përdorur një algoritëm të ri, por është e keqe sepse riprodhueshmëria e rezultatit humbet (sigurisht, askush nuk ju shqetëson të ktheni versionin e kërkuar të kodit burim nga Git dhe të llogarisni se çfarë ju duhet një herë, ashtu siç ju duhet).

Gjenerimi i detyrave

Zbatimi i DAG është kod në Python, kështu që ne kemi një mënyrë shumë të përshtatshme për të zvogëluar sasinë e kodit kur punoni, për shembull, me burime të copëtuara. Le të themi se keni tre copëza MySQL si burim, duhet të ngjiteni në secilën prej tyre dhe të merrni disa të dhëna. Për më tepër, në mënyrë të pavarur dhe paralelisht. Kodi Python në DAG mund të duket si ky:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG duket kështu:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Në këtë rast, mund të shtoni ose hiqni një copëz thjesht duke rregulluar cilësimet dhe duke përditësuar DAG. Të rehatshme!

Ju gjithashtu mund të përdorni gjenerimin më kompleks të kodit, për shembull, të punoni me burime në formën e një baze të dhënash ose të përshkruani një strukturë tabele, një algoritëm për të punuar me një tabelë dhe, duke marrë parasysh veçoritë e infrastrukturës DWH, të gjeneroni një proces për ngarkimin e N tabelave në ruajtjen tuaj. Ose, për shembull, duke punuar me një API që nuk mbështet punën me një parametër në formën e një liste, mund të gjeneroni N detyra në një DAG nga kjo listë, të kufizoni paralelizmin e kërkesave në API në një grup dhe të fshini të dhënat e nevojshme nga API. Fleksibël!

depo

Airflow ka depon e vet backend, një bazë të dhënash (mund të jetë MySQL ose Postgres, ne kemi Postgres), e cila ruan gjendjet e detyrave, DAG-të, cilësimet e lidhjes, variablat globale, etj., etj. Këtu do të doja të them se depoja në Airflow është shumë e thjeshtë (rreth 20 tabela) dhe e përshtatshme nëse doni të ndërtoni ndonjë nga proceset tuaja në krye të tij. Më kujtohen 100500 tabelat në depon e Informatica, të cilat duhej të studioheshin për një kohë të gjatë përpara se të kuptonin se si të ndërtonin një pyetje.

Monitorimi

Duke pasur parasysh thjeshtësinë e depove, ju mund të ndërtoni një proces monitorimi të detyrave që është i përshtatshëm për ju. Ne përdorim një bllok shënimesh në Zeppelin, ku shikojmë statusin e detyrave:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Kjo mund të jetë gjithashtu ndërfaqja në internet e vetë Airflow:

Airflow është një mjet për zhvillimin dhe mirëmbajtjen e lehtë dhe të shpejtë të proceseve të përpunimit të të dhënave në grup

Kodi i rrjedhës së ajrit është me burim të hapur, kështu që ne kemi shtuar sinjalizimin në Telegram. Çdo shembull i ekzekutimit të një detyre, nëse ndodh një gabim, dërgon mesazhe të padëshiruara për grupin në Telegram, ku përbëhet i gjithë ekipi i zhvillimit dhe mbështetjes.

Ne marrim një përgjigje të shpejtë përmes Telegramit (nëse kërkohet) dhe përmes Zeppelin marrim një pamje të përgjithshme të detyrave në Airflow.

Në total

Rrjedha e ajrit është kryesisht burim i hapur dhe nuk duhet të prisni mrekulli prej tij. Jini të përgatitur për të vënë në kohë dhe përpjekje për të ndërtuar një zgjidhje që funksionon. Qëllimi është i arritshëm, më besoni, ia vlen. Shpejtësia e zhvillimit, fleksibiliteti, lehtësia e shtimit të proceseve të reja - do t'ju pëlqejë. Sigurisht, duhet t'i kushtoni shumë vëmendje organizimit të projektit, stabilitetit të vetë rrjedhës së ajrit: mrekullitë nuk ndodhin.

Tani ne kemi Airflow që punon çdo ditë rreth 6,5 mijë detyra. Ata janë mjaft të ndryshëm në karakter. Ka detyra të ngarkimit të të dhënave në DWH kryesore nga shumë burime të ndryshme dhe shumë specifike, ka detyra të llogaritjes së vitrinave brenda DWH kryesore, ka detyra të publikimit të të dhënave në një DWH të shpejtë, ka shumë, shumë detyra të ndryshme - dhe Airflow i përtyp të gjitha ditë pas dite. Duke folur me numra, kjo është 2,3 mijë Detyrat ELT me kompleksitet të ndryshëm brenda DWH (Hadoop), përafërsisht. 2,5 qindra baza të dhënash burime, ky është një ekip nga 4 zhvillues ETL, të cilat ndahen në përpunimin e të dhënave ETL në DWH dhe përpunimin e të dhënave ELT brenda DWH dhe natyrisht më shumë një administrator, i cili merret me infrastrukturën e shërbimit.

Planet për të ardhmen

Numri i proceseve po rritet në mënyrë të pashmangshme dhe gjëja kryesore që do të bëjmë përsa i përket infrastrukturës së Airflow është shkallëzimi. Ne duam të ndërtojmë një grup të rrjedhës së ajrit, të ndajmë një palë këmbë për punëtorët e Selinos dhe të bëjmë një kokë të vetë-dyfishuar me proceset e planifikimit të punës dhe një depo.

epilog

Kjo, natyrisht, nuk është gjithçka që do të doja të tregoja për Airflow, por u përpoqa të theksoja pikat kryesore. Oreksi vjen me të ngrënit, provojeni dhe do t'ju pëlqejë :)

Burimi: www.habr.com

Shto një koment