Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Halló, Habr! Í þessari grein vil ég tala um eitt frábært tól til að þróa lotugagnavinnsluferli, til dæmis í innviðum fyrirtækis DWH eða DataLake þíns. Við munum tala um Apache Airflow (hér eftir nefnt Airflow). Það er á ósanngjarnan hátt svipt athygli á Habré og í meginhlutanum mun ég reyna að sannfæra þig um að að minnsta kosti Airflow sé þess virði að skoða þegar þú velur tímaáætlun fyrir ETL/ELT ferla þína.

Áður skrifaði ég röð greina um efnið DWH þegar ég vann hjá Tinkoff Bank. Nú er ég orðinn hluti af Mail.Ru Group teyminu og er að þróa vettvang fyrir gagnagreiningu á leikjasvæðinu. Reyndar, þegar fréttir og áhugaverðar lausnir birtast, munum við teymi mitt og ég tala hér um vettvang okkar fyrir gagnagreiningar.

Prologue

Svo, við skulum byrja. Hvað er loftflæði? Þetta er bókasafn (eða sett af bókasöfnum) að þróa, skipuleggja og fylgjast með verkferlum. Helstu eiginleikar Airflow: Python kóða er notaður til að lýsa (þróa) ferlum. Þetta hefur marga kosti til að skipuleggja verkefnið þitt og þróun: í rauninni er (til dæmis) ETL verkefnið þitt bara Python verkefni og þú getur skipulagt það eins og þú vilt, að teknu tilliti til sérstakra innviða, hópstærðar og aðrar kröfur. Hljóðfæralega er allt einfalt. Notaðu til dæmis PyCharm + Git. Það er dásamlegt og mjög þægilegt!

Nú skulum við líta á helstu einingar Airflow. Með því að skilja kjarna þeirra og tilgang geturðu skipulagt ferlaarkitektúr þinn á sem bestan hátt. Kannski er aðaleiningin Stýrða ósýklíska grafið (hér eftir nefnt DAG).

DAG

DAG er einhver þýðingarmikil tenging verkefna þinna sem þú vilt klára í strangt skilgreindri röð samkvæmt ákveðinni áætlun. Airflow veitir þægilegt vefviðmót til að vinna með DAG og öðrum aðilum:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

DAG gæti litið svona út:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Framkvæmdaraðili, þegar hann hannar DAG, leggur fyrir hóp rekstraraðila sem verkefni innan DAG verða byggð á. Hér komum við að annarri mikilvægri einingu: Airflow Operator.

Stjórnandi

Rekstraraðili er eining á grundvelli hvaða verktilvik eru búin til, sem lýsir því sem mun gerast við framkvæmd verktilviks. Loftflæði losar frá GitHub innihalda nú þegar sett af rekstraraðilum sem eru tilbúnir til notkunar. Dæmi:

  • BashOperator - stjórnandi til að framkvæma bash skipun.
  • PythonOperator - símafyrirtæki til að hringja í Python kóða.
  • EmailOperator — símafyrirtæki til að senda tölvupóst.
  • HTTPOperator - rekstraraðili til að vinna með http beiðnir.
  • SqlOperator - rekstraraðili til að keyra SQL kóða.
  • Skynjari er rekstraraðili til að bíða eftir atburði (koma tilskilins tíma, útlit nauðsynlegrar skráar, lína í gagnagrunninum, svar frá API osfrv., osfrv.).

Það eru sértækari rekstraraðilar: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Þú getur líka þróað rekstraraðila út frá eigin eiginleikum þínum og notað þá í verkefninu þínu. Til dæmis bjuggum við til MongoDBToHiveViaHdfsTransfer, rekstraraðila til að flytja út skjöl frá MongoDB til Hive, og nokkra rekstraraðila til að vinna með smellahús: CHLoadFromHiveOperator og CHTableLoaderOperator. Í meginatriðum, um leið og verkefni hefur oft notað kóða sem byggður er á grunnyfirlýsingum, geturðu hugsað um að byggja hann inn í nýja yfirlýsingu. Þetta mun einfalda frekari þróun og þú munt stækka safnið þitt af rekstraraðilum í verkefninu.

Næst þarf að framkvæma öll þessi tilvik af verkefnum og nú munum við tala um tímaáætlunina.

Dagskrármaður

Verkefnaáætlun Airflow er byggð á Sellerí. Sellerí er Python bókasafn sem gerir þér kleift að skipuleggja biðröð ásamt ósamstilltri og dreifðri framkvæmd verkefna. Loftflæðismegin er öllum verkefnum skipt í laugar. Laugar eru búnar til handvirkt. Venjulega er tilgangur þeirra að takmarka vinnuálag við að vinna með upprunann eða að tákna verkefni innan DWH. Hægt er að stjórna laugum í gegnum vefviðmótið:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Hver laug hefur takmörk á fjölda spilakassa. Þegar DAG er búið til fær hann laug:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Hægt er að hnekkja laug sem er skilgreind á DAG stigi á verkefnastigi.
Sérstakt ferli, Scheduler, er ábyrgur fyrir tímasetningu allra verkefna í Airflow. Raunar fjallar tímaáætlun um alla vélfræði við að setja verkefni fyrir framkvæmd. Verkefnið fer í gegnum nokkur stig áður en það er framkvæmt:

  1. Fyrri verkefni hafa verið unnin í DAG; hægt er að setja nýtt í biðröð.
  2. Röðinni er raðað eftir forgangi verkefna (einnig er hægt að stjórna forgangsröðun) og ef það er laus rifa í lauginni er hægt að taka verkefnið í notkun.
  3. Ef það er ókeypis vinnusellerí er verkefnið sent til þess; vinnan sem þú forritaðir í vandamálinu byrjar, með því að nota einn eða annan rekstraraðila.

Nógu einfalt.

Tímaáætlun keyrir á setti allra DAGs og allra verkefna innan DAGs.

Til þess að tímaáætlun geti byrjað að vinna með DAG þarf DAG að setja áætlun:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Það er sett af tilbúnum forstillingum: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Þú getur líka notað cron tjáningu:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Framkvæmdardagur

Til að skilja hvernig loftflæði virkar er mikilvægt að skilja hvað framkvæmdadagur er fyrir DAG. Í Airflow hefur DAG vídd framkvæmdardagsetningar, þ.e.a.s., allt eftir vinnuáætlun DAG, eru verktilvik búin til fyrir hvern framkvæmdardag. Og fyrir hvern framkvæmdardag er hægt að framkvæma verkefni aftur - eða til dæmis getur DAG unnið samtímis á nokkrum framkvæmdardögum. Þetta sést greinilega hér:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Því miður (eða kannski sem betur fer: það fer eftir aðstæðum), ef framkvæmd verkefnisins í DAG er leiðrétt, mun framkvæmd á fyrri framkvæmdardegi halda áfram með hliðsjón af leiðréttingunum. Þetta er gott ef þú þarft að endurreikna gögn á liðnum tímabilum með því að nota nýtt reiknirit, en það er slæmt vegna þess að endurgeranleiki niðurstöðunnar tapast (auðvitað nennir enginn þér að skila nauðsynlegri útgáfu frumkóðans frá Git og reikna út hvað þú þarft einu sinni, eins og þú þarft það).

Að búa til verkefni

Útfærslan á DAG er kóða í Python, þannig að við höfum mjög þægilega leið til að minnka magn kóða þegar unnið er til dæmis með sharded sources. Segjum að þú sért með þrjú MySQL-brot sem uppspretta, þú þarft að klifra inn í hvern og ná í gögn. Þar að auki sjálfstætt og samhliða. Python kóðinn í DAG gæti litið svona út:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG lítur svona út:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Í þessu tilviki geturðu bætt við eða fjarlægt brot með því einfaldlega að stilla stillingarnar og uppfæra DAG. Þægilegt!

Þú getur líka notað flóknari kóðagerð, til dæmis, unnið með heimildir í formi gagnagrunns eða lýst töflubyggingu, reiknirit til að vinna með töflu og, að teknu tilliti til eiginleika DWH innviða, búa til ferli til að hlaða N borðum inn í geymsluna þína. Eða, til dæmis, að vinna með API sem styður ekki að vinna með færibreytu í formi lista, þú getur búið til N verkefni í DAG úr þessum lista, takmarkað samsvörun beiðna í API við safn og skafað nauðsynleg gögn frá API. Sveigjanlegur!

geymsla

Airflow hefur sitt eigið bakendageymsla, gagnagrunn (getur verið MySQL eða Postgres, við höfum Postgres), sem geymir stöðu verkefna, DAG, tengistillingar, alþjóðlegar breytur osfrv., osfrv. Hér vil ég segja að repository í Airflow er mjög einfalt (um 20 töflur) og þægilegt ef þú vilt byggja eitthvað af þínum eigin ferlum ofan á það. Ég man eftir 100500 töflunum í Informatica geymslunni, sem þurfti að rannsaka í langan tíma áður en hægt var að skilja hvernig á að byggja upp fyrirspurn.

Eftirlit

Í ljósi einfaldleika geymslunnar geturðu byggt upp verkefnaeftirlitsferli sem hentar þér. Við notum skrifblokk í Zeppelin, þar sem við skoðum stöðu verkefna:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Þetta gæti líka verið vefviðmót Airflow sjálfs:

Airflow er tæki til að þróa og viðhalda lotugagnavinnsluferlum á þægilegan og fljótlegan hátt

Loftflæðiskóðinn er opinn uppspretta, svo við höfum bætt viðvörun við Telegram. Hvert hlaupandi tilvik verkefnis, ef villa kemur upp, sendir hópinn ruslpóst í Telegram, þar sem allt þróunar- og stuðningsteymið samanstendur af.

Við fáum skjót viðbrögð í gegnum Telegram (ef þess er krafist) og í gegnum Zeppelin fáum við heildarmynd af verkefnum í Airflow.

Alls

Loftflæði er fyrst og fremst opinn uppspretta og þú ættir ekki að búast við kraftaverkum frá því. Vertu tilbúinn til að leggja á þig tíma og fyrirhöfn til að byggja upp lausn sem virkar. Markmiðið er náð, trúðu mér, það er þess virði. Þróunarhraði, sveigjanleiki, auðvelt að bæta við nýjum ferlum - þér líkar það. Auðvitað þarftu að borga mikla athygli að skipulagi verkefnisins, stöðugleika loftflæðisins sjálfs: kraftaverk gerast ekki.

Núna erum við með Airflow sem vinnur daglega um 6,5 þúsund verkefni. Þau eru nokkuð ólík í eðli sínu. Það eru verkefni að hlaða gögnum inn í aðal DWH frá mörgum mismunandi og mjög sérstökum aðilum, það eru verkefni að reikna út geymslurými inni í aðal DWH, það eru verkefni að birta gögn í hraðvirka DWH, það eru mörg, mörg mismunandi verkefni - og loftflæði tyggur þá alla upp dag eftir dag. Talandi í tölum, þetta er 2,3 þúsund ELT verkefni af mismunandi flóknum hætti innan DWH (Hadoop), u.þ.b. 2,5 hundruð gagnagrunnar heimildir, þetta er lið frá 4 ETL verktaki, sem skiptast í ETL gagnavinnslu í DWH og ELT gagnavinnslu innan DWH og auðvitað fleira einn admin, sem sér um innviði þjónustunnar.

Áætlanir fyrir framtíðina

Fjöldi ferla fer óumflýjanlega vaxandi og aðalatriðið sem við munum gera hvað varðar loftflæðisinnviði er að stækka. Við viljum byggja upp loftflæðisklasa, úthluta par af fótum fyrir starfsmenn sellerísins og búa til sjálfsafritandi höfuð með vinnuáætlunarferlum og geymslu.

Eftirmáli

Þetta er auðvitað ekki allt sem mig langar að segja um Airflow, en ég reyndi að draga fram aðalatriðin. Matarlyst fylgir því að borða, prófaðu það og þér líkar það :)

Heimild: www.habr.com

Bæta við athugasemd