Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Hei, Habr! I denne artikkelen vil jeg snakke om ett flott verktøy for å utvikle batchdatabehandlingsprosesser, for eksempel i infrastrukturen til en bedrifts DWH eller din DataLake. Vi vil snakke om Apache Airflow (heretter referert til som Airflow). Det er urettferdig fratatt oppmerksomhet på Habré, og i hoveddelen skal jeg prøve å overbevise deg om at i det minste Airflow er verdt å se på når du velger en planlegger for ETL/ELT-prosessene dine.

Tidligere skrev jeg en serie artikler om temaet DWH da jeg jobbet i Tinkoff Bank. Nå har jeg blitt en del av Mail.Ru Group-teamet og utvikler en plattform for dataanalyse på spillområdet. Faktisk, ettersom nyheter og interessante løsninger dukker opp, vil teamet mitt og jeg snakke her om plattformen vår for dataanalyse.

prologen

Så la oss begynne. Hva er luftstrøm? Dette er et bibliotek (eller sett med biblioteker) å utvikle, planlegge og overvåke arbeidsprosesser. Hovedtrekket til Airflow: Python-kode brukes til å beskrive (utvikle) prosesser. Dette har mange fordeler for organisering av prosjektet og utviklingen: i hovedsak er ditt (for eksempel) ETL-prosjekt bare et Python-prosjekt, og du kan organisere det som du ønsker, med tanke på spesifikasjonene til infrastrukturen, teamstørrelse og andre krav. Instrumentelt er alt enkelt. Bruk for eksempel PyCharm + Git. Det er fantastisk og veldig praktisk!

La oss nå se på hovedenhetene til Airflow. Ved å forstå deres essens og formål kan du organisere prosessarkitekturen din optimalt. Kanskje hovedenheten er den rettet asykliske grafen (heretter referert til som DAG).

DAG

En DAG er en meningsfull assosiasjon av oppgavene dine som du ønsker å fullføre i en strengt definert sekvens i henhold til en bestemt tidsplan. Airflow gir et praktisk webgrensesnitt for å jobbe med DAG-er og andre enheter:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

DAG kan se slik ut:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Utvikleren, når han designer en DAG, fastsetter et sett med operatører som oppgaver innenfor DAG skal bygges på. Her kommer vi til en annen viktig enhet: Airflow Operator.

Operatører

En operatør er en enhet på grunnlag av hvilke jobbinstanser som opprettes, som beskriver hva som vil skje under utførelsen av en jobbinstans. Luftstrøm frigjøres fra GitHub inneholder allerede et sett med operatører klare til bruk. Eksempler:

  • BashOperator - operatør for å utføre en bash-kommando.
  • PythonOperator - operatør for å ringe Python-kode.
  • EmailOperator — operatør for sending av e-post.
  • HTTPOperator - operatør for arbeid med http-forespørsler.
  • SqlOperator - operatør for å utføre SQL-kode.
  • Sensor er en operatør for å vente på en hendelse (ankomsten av den nødvendige tiden, utseendet til den nødvendige filen, en linje i databasen, et svar fra API, etc., etc.).

Det er mer spesifikke operatører: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Du kan også utvikle operatører basert på dine egne egenskaper og bruke dem i prosjektet ditt. For eksempel opprettet vi MongoDBToHiveViaHdfsTransfer, en operatør for eksport av dokumenter fra MongoDB til Hive, og flere operatører for å jobbe med ClickHouse: CHLoadFromHiveOperator og CHTableLoaderOperator. I hovedsak, så snart et prosjekt ofte har brukt kode bygget på grunnleggende setninger, kan du tenke på å bygge den inn i en ny setning. Dette vil forenkle videre utvikling, og du vil utvide biblioteket med operatører i prosjektet.

Deretter må alle disse forekomstene av oppgaver utføres, og nå skal vi snakke om planleggeren.

Planlegger

Airflows oppgaveplanlegger er bygget på Selleri. Selleri er et Python-bibliotek som lar deg organisere en kø pluss asynkron og distribuert utførelse av oppgaver. På Airflow-siden er alle oppgaver delt inn i bassenger. Bassenger opprettes manuelt. Hensikten deres er vanligvis å begrense arbeidsmengden ved å jobbe med kilden eller å karakterisere oppgaver innenfor DWH. Bassenger kan administreres via nettgrensesnittet:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Hvert basseng har en grense på antall spilleautomater. Når du oppretter en DAG, får den en pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

En pool definert på DAG-nivå kan overstyres på oppgavenivå.
En egen prosess, Scheduler, er ansvarlig for å planlegge alle oppgaver i Airflow. Faktisk håndterer Scheduler all mekanikken for å sette oppgaver for utførelse. Oppgaven går gjennom flere stadier før den utføres:

  1. De tidligere oppgavene er fullført i DAG, en ny kan settes i kø.
  2. Køen sorteres avhengig av prioritering av oppgaver (prioriteringer kan også styres), og dersom det er ledig plass i bassenget kan oppgaven tas i drift.
  3. Hvis det er en ledig arbeiderselleri, sendes oppgaven til den; arbeidet som du programmerte i oppgaven begynner, ved å bruke en eller annen operatør.

Enkelt nok.

Scheduler kjører på settet med alle DAG-er og alle oppgaver innenfor DAG-er.

For at Scheduler skal begynne å jobbe med DAG, må DAG angi en tidsplan:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Det er et sett med ferdige forhåndsinnstillinger: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Du kan også bruke cron-uttrykk:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Utførelsesdato

For å forstå hvordan Airflow fungerer, er det viktig å forstå hva utførelsesdato er for en DAG. I Airflow har DAG en utførelsesdato-dimensjon, det vil si at avhengig av DAGs arbeidsplan opprettes oppgaveforekomster for hver utførelsesdato. Og for hver utførelsesdato kan oppgaver utføres på nytt – eller for eksempel kan en DAG jobbe samtidig i flere utførelsesdatoer. Dette vises tydelig her:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Dessverre (eller kanskje heldigvis: det avhenger av situasjonen), hvis implementeringen av oppgaven i DAG blir korrigert, vil utførelsen i forrige utførelsesdato fortsette med hensyn til justeringene. Dette er bra hvis du trenger å beregne data på nytt i tidligere perioder ved hjelp av en ny algoritme, men det er dårlig fordi reproduserbarheten til resultatet går tapt (selvfølgelig er det ingen som plager deg med å returnere den nødvendige versjonen av kildekoden fra Git og beregne hva du trenger én gang, slik du trenger den).

Generer oppgaver

Implementeringen av DAG er kode i Python, så vi har en veldig praktisk måte å redusere mengden kode på når vi for eksempel jobber med sharded kilder. La oss si at du har tre MySQL-skår som kilde, du må klatre inn i hver og plukke opp noen data. Dessuten uavhengig og parallelt. Python-koden i DAG kan se slik ut:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG ser slik ut:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

I dette tilfellet kan du legge til eller fjerne et shard ved å justere innstillingene og oppdatere DAG. Komfortabel!

Du kan også bruke mer kompleks kodegenerering, for eksempel arbeide med kilder i form av en database eller beskrive en tabellstruktur, en algoritme for arbeid med en tabell, og, tatt i betraktning funksjonene til DWH-infrastrukturen, generere en prosess for å laste inn N tabeller i lageret ditt. Eller, for eksempel, arbeider med et API som ikke støtter arbeid med en parameter i form av en liste, kan du generere N oppgaver i en DAG fra denne listen, begrense parallelliteten til forespørsler i APIen til en pool, og skrape nødvendige data fra API. Fleksibel!

oppbevaringssted

Airflow har sitt eget backend-lager, en database (kan være MySQL eller Postgres, vi har Postgres), som lagrer tilstandene til oppgaver, DAG-er, tilkoblingsinnstillinger, globale variabler osv. osv. Her vil jeg gjerne si at repository i Airflow er veldig enkelt (ca. 20 tabeller) og praktisk hvis du vil bygge noen av dine egne prosesser på toppen av det. Jeg husker de 100500 XNUMX tabellene i Informatica-depotet, som måtte studeres i lang tid før man forstår hvordan man bygger en spørring.

overvåking

Gitt enkelheten til depotet, kan du bygge en oppgaveovervåkingsprosess som er praktisk for deg. Vi bruker en notisblokk i Zeppelin, hvor vi ser på status for oppgaver:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Dette kan også være webgrensesnittet til selve Airflow:

Airflow er et verktøy for enkelt og raskt å utvikle og vedlikeholde batchdatabehandlingsprosesser

Airflow-koden er åpen kildekode, så vi har lagt til varsling til Telegram. Hver kjørende forekomst av en oppgave, hvis det oppstår en feil, spammer gruppen i Telegram, der hele utviklings- og støtteteamet består.

Vi får raskt svar gjennom Telegram (hvis nødvendig), og gjennom Zeppelin får vi et samlet bilde av oppgaver i Airflow.

Totalt

Airflow er først og fremst åpen kildekode, og du bør ikke forvente mirakler fra det. Vær forberedt på å bruke tid og krefter på å bygge en løsning som fungerer. Målet er oppnåelig, tro meg, det er verdt det. Utviklingshastighet, fleksibilitet, enkel å legge til nye prosesser - du vil like det. Selvfølgelig må du betale mye oppmerksomhet til organiseringen av prosjektet, stabiliteten til selve luftstrømmen: mirakler skjer ikke.

Nå har vi Airflow som jobber daglig ca 6,5 ​​tusen oppgaver. De er ganske forskjellige i karakter. Det er oppgaver med å laste data inn i hoved-DWH fra mange forskjellige og veldig spesifikke kilder, det er oppgaver med å beregne butikkfronter inne i hoved-DWH, det er oppgaver med å publisere data til en rask DWH, det er mange, mange forskjellige oppgaver - og Airflow tygger dem alle opp dag etter dag. Snakker i tall, dette er 2,3 tusen ELT-oppgaver av varierende kompleksitet innen DWH (Hadoop), ca. 2,5 hundre databaser kilder, er dette et team fra 4 ETL-utviklere, som er delt inn i ETL-databehandling i DWH og ELT-databehandling i DWH og selvfølgelig mer én admin, som tar seg av infrastrukturen til tjenesten.

Planer for fremtiden

Antall prosesser vokser uunngåelig, og det viktigste vi skal gjøre når det gjelder Airflow-infrastrukturen er skalering. Vi ønsker å bygge en Airflow-klynge, tildele et par ben til Selleri-arbeidere og lage et selvduplikerende hode med jobbplanleggingsprosesser og et depot.

Epilog

Dette er selvfølgelig ikke alt jeg vil fortelle om Airflow, men jeg prøvde å fremheve hovedpoengene. Matlyst følger med å spise, prøv det og du vil like det :)

Kilde: www.habr.com

Legg til en kommentar