Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Hei, Habr! Tässä artikkelissa haluan puhua yhdestä loistavasta työkalusta erätietojen käsittelyprosessien kehittämiseen esimerkiksi yrityksen DWH:n tai DataLaken infrastruktuurissa. Puhumme Apache Airflowsta (jäljempänä Airflow). Se on epäoikeudenmukaisesti riistetty Habrén huomiosta, ja pääosin yritän vakuuttaa sinut siitä, että ainakin Airflow kannattaa katsoa valittaessa ajastinta ETL/ELT-prosesseihisi.

Aiemmin kirjoitin sarjan artikkeleita DWH-aiheesta työskennellessäni Tinkoff Bankissa. Nyt olen liittynyt osaksi Mail.Ru Group -tiimiä ja olen kehittämässä alustaa data-analyysille pelialalla. Itse asiassa, kun uutisia ja mielenkiintoisia ratkaisuja ilmaantuu, tiimini ja minä puhumme täällä data-analytiikan alustasta.

prologi

Joten aloitetaan. Mikä on ilmavirtaus? Tämä on kirjasto (tai joukko kirjastoja) kehittää, suunnitella ja seurata työprosesseja. Airflown pääominaisuus: Python-koodia käytetään kuvaamaan (kehittämään) prosesseja. Tästä on paljon etuja projektin ja kehityksen organisoinnissa: pohjimmiltaan (esimerkiksi) ETL-projektisi on vain Python-projekti, ja voit järjestää sen haluamallasi tavalla ottaen huomioon infrastruktuurin erityispiirteet, tiimin koon ja muut vaatimukset. Instrumentaalisesti kaikki on yksinkertaista. Käytä esimerkiksi PyCharm + Git. Se on upea ja erittäin kätevä!

Katsotaanpa nyt Airflown pääkokonaisuuksia. Ymmärtämällä niiden olemuksen ja tarkoituksen voit järjestää prosessiarkkitehtuurisi optimaalisesti. Ehkä pääkokonaisuus on suunnattu asyklinen graafi (jäljempänä DAG).

PÄIVÄ

DAG on jokin mielekäs yhdistelmä tehtävistäsi, jotka haluat suorittaa tiukasti määritellyssä järjestyksessä tietyn aikataulun mukaisesti. Airflow tarjoaa kätevän verkkokäyttöliittymän DAG:iden ja muiden yksiköiden kanssa työskentelemiseen:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

DAG voi näyttää tältä:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Suunnitellessaan DAG:ta kehittäjä määrittelee joukon operaattoreita, joiden varaan DAG:n tehtävät rakennetaan. Tässä pääsemme toiseen tärkeään kokonaisuuteen: Airflow Operator.

toimijoiden

Operaattori on entiteetti, jonka perusteella työinstanssit luodaan ja joka kuvaa mitä tapahtuu työinstanssin suorituksen aikana. Airflow vapautuu GitHubista sisältää jo joukon operaattoreita, jotka ovat valmiita käyttöön. Esimerkkejä:

  • BashOperator - operaattori bash-komennon suorittamiseen.
  • PythonOperator - operaattori Python-koodin kutsumiseen.
  • EmailOperator — sähköpostin lähettämiseen tarkoitettu operaattori.
  • HTTPOperator - operaattori http-pyyntöjen kanssa työskentelemiseen.
  • SqlOperator - operaattori SQL-koodin suorittamiseen.
  • Anturi on operaattori tapahtuman odottamiseen (vaaditun ajan saapuminen, vaaditun tiedoston ilmestyminen, tietokannan rivi, API vastaus jne. jne.).

On olemassa tarkempia operaattoreita: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Voit myös kehittää operaattoreita omien ominaisuuksiisi perustuen ja käyttää niitä projektissasi. Loimme esimerkiksi MongoDBToHiveViaHdfsTransferin, operaattorin asiakirjojen viemiseen MongoDB:stä Hiveen, ja useita operaattoreita työskennelläkseen Napsauta taloa: CHLoadFromHiveOperator ja CHTableLoaderOperator. Pohjimmiltaan, heti kun projektissa on usein käytetty peruskäskyihin rakennettua koodia, voit miettiä sen rakentamista uudeksi lauseeksi. Tämä yksinkertaistaa jatkokehitystä ja laajennat operaattorikirjastoasi projektissa.

Seuraavaksi kaikki nämä tehtävien esiintymät on suoritettava, ja nyt puhumme ajoittimesta.

Ajastin

Airflown tehtävien ajoitus on rakennettu Selleri. Celery on Python-kirjasto, jonka avulla voit järjestää jonon sekä asynkronisen ja hajautetun tehtävien suorittamisen. Airflow-puolella kaikki tehtävät on jaettu pooleihin. Altaat luodaan manuaalisesti. Tyypillisesti niiden tarkoitus on rajoittaa lähteen kanssa työskentelyn työmäärää tai tyypillistä tehtäviä DWH:n sisällä. Pooleja voidaan hallita verkkokäyttöliittymän kautta:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Jokaisella poolilla on rajoitus kolikkopelien lukumäärälle. Kun luot DAG:n, sille annetaan pooli:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG-tasolla määritetty pooli voidaan ohittaa tehtävätasolla.
Erillinen prosessi, Scheduler, vastaa kaikkien Airflown tehtävien ajoittamisesta. Itse asiassa Scheduler käsittelee kaikkia suoritettavien tehtävien asettamisen mekaniikkaa. Tehtävä käy läpi useita vaiheita ennen suorittamista:

  1. Aiemmat tehtävät on suoritettu DAG:ssa, uusi voidaan laittaa jonoon.
  2. Jono lajitellaan tehtävien tärkeysjärjestyksen mukaan (prioriteetit ovat myös ohjattavissa), ja jos poolissa on vapaata paikkaa, tehtävä voidaan ottaa käyttöön.
  3. Jos on vapaa työntekijä selleri, tehtävä lähetetään sille; työ, jonka ohjelmoit ongelmaan, alkaa käyttämällä yhtä tai toista operaattoria.

Tarpeeksi yksinkertainen.

Scheduler toimii kaikkien DAG:iden ja kaikkien DAG:ien tehtävien sarjassa.

Jotta Scheduler voi aloittaa työskentelyn DAG:n kanssa, DAG:n on asetettava aikataulu:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Siellä on joukko valmiita esiasetuksia: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Voit myös käyttää cron-lausekkeita:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Toteuttamispäivää

Ilmavirran toiminnan ymmärtämiseksi on tärkeää ymmärtää, mikä on DAG:n suorituspäivä. Airflow:ssa DAG:lla on Suorituspäivä-ulottuvuus, eli DAG:n työaikataulusta riippuen jokaiselle suorituspäivämäärälle luodaan tehtäväinstanssit. Ja jokaiselle toteutuspäivälle tehtävät voidaan suorittaa uudelleen - tai esimerkiksi DAG voi työskennellä samanaikaisesti useana suorituspäivänä. Tämä näkyy selvästi tässä:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Valitettavasti (tai ehkä onneksi: riippuu tilanteesta), jos tehtävän toteutus DAG:ssa korjataan, niin suoritus edellisellä suorituspäivällä etenee säädöt huomioiden. Tämä on hyvä, jos joudut laskemaan aiempien kausien tiedot uudelleen uudella algoritmilla, mutta se on huono, koska tuloksen toistettavuus menetetään (tietenkään kukaan ei vaivaudu palauttamaan vaadittua lähdekoodin versiota Gitistä ja laskemaan mitä tarvitset kerran, niin kuin tarvitset).

Tehtävien luominen

DAG:n toteutus on Python-koodia, joten meillä on erittäin kätevä tapa vähentää koodin määrää työskennellessämme esimerkiksi sirpaloitujen lähteiden kanssa. Oletetaan, että sinulla on lähteenä kolme MySQL-sirpaletta, sinun täytyy kiivetä jokaiseen ja poimia tietoja. Lisäksi itsenäisesti ja rinnakkain. Python-koodi DAG:ssa voi näyttää tältä:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG näyttää tältä:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Tässä tapauksessa voit lisätä tai poistaa sirpaleen yksinkertaisesti säätämällä asetuksia ja päivittämällä DAG:n. Mukava!

Voit myös käyttää monimutkaisempaa koodin generointia, esimerkiksi työskennellä lähteiden kanssa tietokannan muodossa tai kuvata taulukkorakennetta, algoritmia taulukon kanssa työskentelyyn ja DWH-infrastruktuurin ominaisuudet huomioon ottaen luoda prosessin. N taulukon lataamiseen varastoosi. Tai esimerkiksi työskennellessäsi API:lla, joka ei tue työskentelyä parametrin kanssa luettelon muodossa, voit luoda N tehtävää DAG:ssa tästä luettelosta, rajoittaa sovellusliittymän pyyntöjen rinnakkaisuuden pooliin ja kaapia tarvittavat tiedot API:lta. Joustava!

arkisto

Airflowlla on oma taustatietovarasto, tietokanta (voi olla MySQL tai Postgres, meillä on Postgres), joka tallentaa tehtävien tilat, DAG:t, yhteysasetukset, globaalit muuttujat jne., jne. Tässä haluan sanoa, että Airflow-varasto on hyvin yksinkertainen (noin 20 taulukkoa) ja kätevä, jos haluat rakentaa sen päälle omia prosessejasi. Muistan Informatican arkiston 100500 XNUMX taulukkoa, joita piti tutkia pitkään ennen kuin ymmärsimme kyselyn rakentamisen.

seuranta

Arkiston yksinkertaisuuden vuoksi voit rakentaa sinulle sopivan tehtävien seurantaprosessin. Käytämme Zeppelinissä muistilehteä, jossa tarkastellaan tehtävien tilaa:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Tämä voisi olla myös itse Airflown verkkokäyttöliittymä:

Airflow on työkalu erätietojen käsittelyprosessien kätevään ja nopeaan kehittämiseen ja ylläpitoon

Airflow-koodi on avoimen lähdekoodin, joten olemme lisänneet hälytyksen Telegramiin. Jokainen käynnissä oleva tehtävän esiintymä, jos tapahtuu virhe, lähettää roskapostin Telegramin ryhmään, jossa koko kehitys- ja tukitiimi koostuu.

Saamme nopean vastauksen Telegramin kautta (tarvittaessa), ja Zeppelinin kautta saamme kokonaiskuvan Airflown tehtävistä.

Yhteensä

Airflow on ensisijaisesti avoimen lähdekoodin lähde, ja sinun ei pitäisi odottaa siltä ihmeitä. Ole valmis käyttämään aikaa ja vaivaa toimivan ratkaisun rakentamiseen. Tavoite on saavutettavissa, usko minua, se on sen arvoista. Kehityksen nopeus, joustavuus, uusien prosessien lisäämisen helppous - pidät siitä. Tietenkin sinun on kiinnitettävä paljon huomiota projektin organisointiin, itse Airflown vakauteen: ihmeitä ei tapahdu.

Nyt Airflow toimii päivittäin noin 6,5 tuhatta tehtävää. He ovat luonteeltaan melko erilaisia. On tehtäviä ladata dataa pää-DWH:hen monista erilaisista ja hyvin spesifisistä lähteistä, on tehtäviä laskea julkisivuja pää-DWH:n sisällä, on tehtäviä julkaista dataa nopeaan DWH:hen, on monia, monia erilaisia ​​tehtäviä - ja Airflow pureskelee niitä kaikkia päivästä toiseen. Numeroissa puhuttaessa näin on Xnumx tuhat ELT-tehtävät vaihtelevan monimutkaisen DWH:n (Hadoop) sisällä, noin. 2,5 sataa tietokantaa lähteistä, tämä on tiimi 4 ETL-kehittäjää, jotka on jaettu ETL-tietojen käsittelyyn DWH:ssa ja ELT-tiedonkäsittelyyn DWH:ssa ja tietysti muuhun yksi admin, joka käsittelee palvelun infrastruktuuria.

Tulevaisuuden suunnitelmat

Prosessien määrä kasvaa väistämättä, ja tärkein asia, jota teemme Airflow-infrastruktuurin suhteen, on skaalautuminen. Haluamme rakentaa Airflow-klusterin, varata jalkaparin sellerityöntekijöille ja tehdä itsekopioivan pään, jossa on työn aikataulutusprosesseja ja arkiston.

Epilogi

Tämä ei tietenkään ole kaikki, mitä haluaisin kertoa Airflowsta, mutta yritin korostaa pääkohtia. Ruokahalu tulee syödessä, kokeile ja pidät siitä :)

Lähde: will.com

Lisää kommentti