Apache Airflow: ETL-i lihtsustamine

Tere, mina olen Dmitri Logvinenko – Vezeti ettevõtete grupi analüüsiosakonna andmeinsener.

Räägin teile suurepärasest tööriistast ETL protsesside arendamiseks - Apache Airflow. Kuid Airflow on nii mitmekülgne ja mitmetahuline, et peaksite seda lähemalt uurima ka siis, kui te pole andmevoogudega seotud, kuid teil on vajadus perioodiliselt mingeid protsesse käivitada ja nende täitmist jälgida.

Ja jah, ma mitte ainult ei ütle, vaid ka näitan: programmis on palju koodi, ekraanipilte ja soovitusi.

Apache Airflow: ETL-i lihtsustamine
Mida tavaliselt näete, kui googeldate sõna Airflow / Wikimedia Commons

Sisukord

Sissejuhatus

Apache Airflow on täpselt nagu Django:

  • kirjutatud pythonis
  • seal on suurepärane administraatoripaneel,
  • piiramatult laiendatav

- ainult parem, ja see tehti täiesti erinevatel eesmärkidel, nimelt (nagu kati ees on kirjutatud):

  • ülesannete käitamine ja jälgimine piiramatul arvul masinatel (nii palju kui palju sellerit/kuberneteid ja teie südametunnistus lubavad)
  • dünaamilise töövoo genereerimisega Pythoni koodist, mida on väga lihtne kirjutada ja mõista
  • ja võimalus ühendada omavahel mis tahes andmebaase ja API-sid, kasutades nii valmiskomponente kui ka kodus valmistatud pluginaid (mis on äärmiselt lihtne).

Me kasutame Apache Airflow'i järgmiselt:

  • kogume andmeid erinevatest allikatest (paljud SQL Serveri ja PostgreSQL-i eksemplarid, erinevad API-d koos rakenduste mõõdikutega, isegi 1C) DWH-s ja ODS-is (meil on Vertica ja Clickhouse).
  • kui arenenud cron, mis käivitab ODS-i andmete koondamise protsessid ja jälgib ka nende hooldust.

Kuni viimase ajani kattis meie vajadused üks väike server 32 tuuma ja 50 GB muutmäluga. Airflow puhul töötab see:

  • rohkem 200 päeva (tegelikult töövood, millesse me ülesandeid toppisime),
  • igas keskmiselt 70 ülesannet,
  • see headus algab (ka keskmiselt) kord tunnis.

Ja sellest, kuidas me laienesime, kirjutan allpool, kuid nüüd määratleme überprobleemi, mille me lahendame:

Algupäraseid SQL-servereid on kolm, millest igaühel on 50 andmebaasi - vastavalt ühe projekti eksemplarid, neil on sama struktuur (peaaegu igal pool, mua-ha-ha), mis tähendab, et igal neist on Tellimuste tabel (õnneks tabel sellega nime saab igasse ärisse suruda). Me võtame andmed, lisades teenuseväljad (allikaserver, lähteandmebaas, ETL-i ülesande ID) ja viskame need naiivselt näiteks Verticasse.

Mine!

Põhiosa, praktiline (ja natuke teoreetiline)

Miks meie (ja teie)

Kui puud olid suured ja mina olin lihtne SQL-schik ühes Venemaa jaemüügis petsime ETL-i protsesse ehk andmevooge, kasutades kahte meile saadaolevat tööriista:

  • Informaatika toitekeskus - äärmiselt laialivalguv süsteem, äärmiselt produktiivne, oma riistvaraga, oma versioonidega. Kasutasin jumal hoidku 1% selle võimalustest. Miks? Noh, esiteks avaldas see kuskil 380ndatest pärit liides meile vaimselt survet. Teiseks on see tööriist mõeldud ülimalt uhketeks protsessideks, raevukaks komponentide taaskasutamiseks ja muudeks väga tähtsateks ettevõtte nippideks. Selle kohta, et see maksab, nagu Airbus AXNUMX tiib aastas, ei ütle me midagi.

    Ettevaatust, ekraanipilt võib alla 30-aastastele inimestele pisut haiget teha

    Apache Airflow: ETL-i lihtsustamine

  • SQL Serveri integratsiooniserver - kasutasime seda seltsimeest oma projektisisestes voogudes. Noh, tegelikult: me kasutame juba SQL Serverit ja oleks kuidagi ebamõistlik mitte kasutada selle ETL-i tööriistu. Kõik selles on hea: nii liides on ilus kui ka edenemise aruanded ... Kuid see pole põhjus, miks me tarkvaratooteid armastame, oh, mitte selle pärast. Versioon see dtsx (mis on salvestamisel segatud sõlmedega XML) saame, aga mis mõte sellel on? Kuidas oleks teha ülesannete pakett, mis lohistaks sadu tabeleid ühest serverist teise? Jah, mis sada, nimetissõrm kukub hiirenupule klõpsates kahekümnest tükist maha. Kuid see näeb kindlasti moodsam välja:

    Apache Airflow: ETL-i lihtsustamine

Kindlasti otsisime väljapääse. Juhtum isegi peaaegu jõudis ise kirjutatud SSIS-i paketigeneraatorini ...

…ja siis leidis mind uus töökoht. Ja Apache Airflow möödus minust sellel.

Kui sain teada, et ETL-i protsesside kirjeldused on lihtne Pythoni kood, siis ma lihtsalt ei tantsinud rõõmust. Nii sai andmevooge versioonitud ja diferentseeritud ning ühe struktuuriga tabelite valamine sadadest andmebaasidest ühte sihtmärki sai Pythoni koodi asjaks pooleteise või kahe 13-tollise ekraaniga.

Klastri kokkupanek

Ärgem korraldagem täiesti lasteaeda ja ärgem rääkigem siin täiesti ilmsetest asjadest, nagu Airflow installimine, teie valitud andmebaas, seller ja muud dokkides kirjeldatud juhtumid.

Et saaksime kohe katseid alustada, visandasin docker-compose.yml milles:

  • Tõstame tegelikult Õhuvool: ajakava, veebiserver. Flower hakkab seal ka keerlema, et jälgida selleri ülesandeid (sest see on juba sisse lükatud apache/airflow:1.10.10-python3.7, aga meil pole selle vastu midagi)
  • PostgreSQL, kuhu Airflow kirjutab oma teenuseteabe (plaanija andmed, täitmisstatistika jne) ja Selery märgib lõpetatud ülesanded;
  • Redis, mis tegutseb selleri tööülesannete vahendajana;
  • Selleri töötaja, mis hakkab tegelema ülesannete otsese täitmisega.
  • Kausta ./dags lisame oma failid dagsi kirjeldusega. Need korjatakse üles lennult, nii et pärast iga aevastamist pole vaja kogu virnaga žongleerida.

Mõnes kohas pole näidetes olev kood lõpuni välja toodud (et mitte teksti segamini ajada), kuid kuskil muudetakse seda protsessi käigus. Täielikud töötavad koodinäited leiate hoidlast https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Märkused:

  • Kompositsiooni kokkupanemisel toetusin suuresti tuntud kuvandile puckel/docker-õhuvool - vaadake seda kindlasti. Võib-olla polegi midagi muud oma ellu vaja.
  • Kõik õhuvoolu seaded on saadaval mitte ainult läbi airflow.cfg, aga ka keskkonnamuutujate kaudu (tänu arendajatele), mida ma pahatahtlikult ära kasutasin.
  • Loomulikult pole see tootmisvalmis: ma ei pannud meelega konteineritele südamelööke, ei vaevanud end turvalisusega. Aga tegin meie katsetajatele sobiva miinimumi.
  • Pange tähele, et:
    • Kaust dag peab olema juurdepääsetav nii planeerijale kui ka töötajatele.
    • Sama kehtib kõigi kolmandate osapoolte teekide kohta – need kõik peavad olema installitud ajakava ja töötajatega masinatesse.

Noh, nüüd on kõik lihtne:

$ docker-compose up --scale worker=3

Kui kõik on tõusnud, saate vaadata veebiliideseid:

Põhimõisted

Kui te kõigist nendest "päevadest" midagi aru ei saanud, siis siin on lühike sõnastik:

  • Scheduler - Airflow'i kõige olulisem onu, kes kontrollib, et robotid töötaksid kõvasti, mitte inimene: jälgib ajakava, värskendab päevakirju, käivitab ülesandeid.

    Üldiselt oli vanemates versioonides tal probleeme mäluga (ei, mitte amneesia, vaid lekked) ja pärandparameeter jäi isegi konfiguratsioonidesse. run_duration — selle taaskäivitamise intervall. Aga nüüd on kõik hästi.

  • DAG (teise nimega "dag") - "suunatud atsükliline graafik", kuid selline määratlus ütleb vähestele inimestele, kuid tegelikult on see üksteisega suhtlevate ülesannete konteiner (vt allpool) või SSIS-i paketi ja Informatica töövoo analoog .

    Lisaks dagidele võib siiski olla ka alamdage, kuid suure tõenäosusega me nendeni ei jõua.

  • DAG Jooks - initsialiseeritud dag, millele on määratud oma execution_date. Sama dag'i dagranid võivad töötada paralleelselt (muidugi, kui olete oma ülesanded idempotentseks muutnud).
  • operaator on kooditükid, mis vastutavad konkreetse toimingu sooritamise eest. Operaatoreid on kolme tüüpi:
    • tegevusnagu meie lemmik PythonOperator, mis suudab käivitada mis tahes (kehtiva) Pythoni koodi;
    • üle, mis transpordivad andmeid ühest kohast teise, näiteks MsSqlToHiveTransfer;
    • andur teisest küljest võimaldab see reageerida või aeglustada dag edasist täitmist kuni sündmuse toimumiseni. HttpSensor saab määratud lõpp-punkti tõmmata ja kui soovitud vastus ootab, alustage edastamist GoogleCloudStorageToS3Operator. Uudishimulik meel küsib: "Miks? Kordusi saab ju otse operaatoris teha!” Ja siis, et mitte ummistada peatatud operaatoritega ülesannete kogumit. Andur käivitub, kontrollib ja sureb enne järgmist katset.
  • Ülesanne - deklareeritud operaatorid, olenemata tüübist ja lisatud dag, ülendatakse ülesande auastmele.
  • ülesande eksemplar - kui üldplaneerija otsustas, et on aeg ülesanded esitajatele-töölistele lahingusse saata (kohe kohapeal, kui kasutame LocalExecutor või kaugsõlme puhul CeleryExecutor), määrab see neile konteksti (st muutujate komplekti – täitmisparameetrid), laiendab käsu- või päringumalle ja koondab need kokku.

Loome ülesandeid

Esmalt visandame oma dougi üldise skeemi ja seejärel sukeldume üha enam detailidesse, sest rakendame mõningaid mittetriviaalseid lahendusi.

Nii et kõige lihtsamal kujul näeb selline dag välja järgmine:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Selgitame:

  • Esiteks impordime vajalikud libid ja midagi muud;
  • sql_server_ds - Kas List[namedtuple[str, str]] Airflow Connectionsi ühenduste nimedega ja andmebaasidega, kust me oma plaadi võtame;
  • dag - meie päevakuulutus, mis peab tingimata sees olema globals(), muidu Airflow seda ei leia. Doug peab ka ütlema:
    • mis ta nimi on orders - see nimi kuvatakse seejärel veebiliideses,
    • et ta töötab kaheksanda juuli südaööst,
    • ja see peaks käima umbes iga 6 tunni järel (kõvade meeste jaoks siin selle asemel timedelta() vastuvõetav cron- rida 0 0 0/6 ? * * *, vähem lahedatele - väljend nagu @daily);
  • workflow() teeb põhitöö ära, aga mitte praegu. Praegu lihtsalt heidame oma konteksti logisse.
  • Ja nüüd ülesannete loomise lihtne võlu:
    • jookseme läbi oma allikate;
    • initsialiseerida PythonOperator, mis hukkab meie mannekeeni workflow(). Ärge unustage määrata ülesande kordumatut (dag-i sees) nime ja siduda dag ise. Lipp provide_context omakorda lisab funktsiooni täiendavaid argumente, mille abil me hoolikalt kogume **context.

Praeguseks on see kõik. Mida me saime:

  • uus dag veebiliideses,
  • poolteist sada ülesannet, mida täidetakse paralleelselt (kui Airflow, Celery seaded ja serveri maht seda võimaldavad).

Noh, peaaegu sain aru.

Apache Airflow: ETL-i lihtsustamine
Kes paigaldab sõltuvused?

Kogu selle asja lihtsustamiseks keerasin asja sisse docker-compose.yml töötlemine requirements.txt kõigil sõlmedel.

Nüüd on see kadunud:

Apache Airflow: ETL-i lihtsustamine

Hallid ruudud on plaanija poolt töödeldud ülesannete eksemplarid.

Ootame veidi, ülesanded napsavad töölised:

Apache Airflow: ETL-i lihtsustamine

Rohelised on loomulikult oma töö edukalt lõpetanud. Punased ei ole eriti edukad.

Muide, meie tootel pole kausta ./dags, masinate vahel ei ole sünkroonimist – kõik dagid peituvad git meie Gitlabis ja Gitlab CI jagab liitumisel masinatele värskendusi master.

Natuke Lillest

Samal ajal kui töömehed meie lutte peksavad, meenutagem veel üht tööriista, mis võib meile midagi näidata – Lille.

Kõige esimene leht töötajate sõlmede kokkuvõtliku teabega:

Apache Airflow: ETL-i lihtsustamine

Kõige ägedam leht töösse läinud ülesannetega:

Apache Airflow: ETL-i lihtsustamine

Kõige igavam leht meie maakleri staatusega:

Apache Airflow: ETL-i lihtsustamine

Eredaim leht on ülesannete olekugraafikute ja nende täitmise ajaga:

Apache Airflow: ETL-i lihtsustamine

Laadime alakoormatud

Niisiis, kõik ülesanded on lahendatud, võite haavatud ära viia.

Apache Airflow: ETL-i lihtsustamine

Ja haavatuid oli palju – ühel või teisel põhjusel. Airflow õige kasutamise korral näitavad just need ruudud, et andmed kindlasti kohale ei jõudnud.

Peate jälgima logi ja taaskäivitama langenud ülesannete eksemplare.

Klõpsates mis tahes ruudul, näeme meile saadaolevaid toiminguid:

Apache Airflow: ETL-i lihtsustamine

Sa võid võtta ja teha Clear langenud. See tähendab, et unustame, et seal on midagi ebaõnnestunud, ja sama eksemplari ülesanne läheb planeerijasse.

Apache Airflow: ETL-i lihtsustamine

Selge on see, et kõigi punaste ruutudega hiirega seda teha ei ole kuigi humaanne – seda me Airflow’lt ei oota. Loomulikult on meil massihävitusrelvi: Browse/Task Instances

Apache Airflow: ETL-i lihtsustamine

Valime kõik korraga ja lähtestame nullile, klõpsake õiget üksust:

Apache Airflow: ETL-i lihtsustamine

Peale puhastamist näevad meie taksod välja sellised (nad juba ootavad, et sõiduplaani koostaja need sõiduplaani koostaks):

Apache Airflow: ETL-i lihtsustamine

Ühendused, konksud ja muud muutujad

On aeg vaadata järgmist DAG-i, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Kas kõik on kunagi aruannet värskendanud? See on jälle tema: seal on nimekiri allikatest, kust andmeid hankida; on nimekiri, kuhu panna; ärge unustage helinata, kui kõik juhtus või katki läks (noh, see ei puuduta meid, ei).

Vaatame faili uuesti läbi ja vaatame uusi ebaselgeid asju:

  • from commons.operators import TelegramBotSendMessage - miski ei takista meil oma operaatoreid tegemast, mida kasutasime ära, tehes Unblocked sõnumite saatmiseks väikese ümbrise. (Sellest operaatorist räägime lähemalt allpool);
  • default_args={} - dag saab jagada samu argumente kõigile oma operaatoritele;
  • to='{{ var.value.all_the_kings_men }}' - väli to meil ei ole kõvakodeeritud, vaid dünaamiliselt genereeritud, kasutades Jinjat ja muutujat koos e-kirjade loendiga, mille ma hoolikalt sisestasin Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — operaatori käivitamise tingimus. Meie puhul lendab kiri ülemustele alles siis, kui kõik sõltuvused on korda läinud edukalt;
  • tg_bot_conn_id='tg_main' - argumendid conn_id aktsepteerige meie loodud ühenduse ID-sid Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegrami sõnumid lendavad minema ainult siis, kui on langenud ülesandeid;
  • task_concurrency=1 - keelame ühe ülesande mitme ülesande eksemplari samaaegse käivitamise. Vastasel juhul käivitame korraga mitu VerticaOperator (vaatab ühte lauda);
  • report_update >> [email, tg] - kõik VerticaOperator koonduvad kirjade ja sõnumite saatmisel, näiteks järgmiselt:
    Apache Airflow: ETL-i lihtsustamine

    Kuid kuna teavitajate operaatoritel on erinevad käivitamistingimused, töötab ainult üks. Puuvaates näeb kõik veidi vähem visuaalne:
    Apache Airflow: ETL-i lihtsustamine

Ma ütlen paar sõna selle kohta makrod ja nende sõbrad - muutujad.

Makrod on Jinja kohahoidjad, mis võivad operaatori argumentidesse mitmesuguse kasuliku teabe asendada. Näiteks nii:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} laieneb kontekstimuutuja sisule execution_date vormingus YYYY-MM-DD: 2020-07-14. Parim osa on see, et kontekstimuutujad naelutatakse konkreetse ülesande eksemplari külge (puuvaates ruut) ja taaskäivitamisel laienevad kohahoidjad samadele väärtustele.

Määratud väärtusi saab vaadata iga ülesande eksemplari nupu Renderdatud abil. Kirja saatmise ülesanne on järgmine:

Apache Airflow: ETL-i lihtsustamine

Ja nii sõnumi saatmise ülesande juures:

Apache Airflow: ETL-i lihtsustamine

Siit leiate uusima saadaoleva versiooni sisseehitatud makrode täieliku loendi: makrode viide

Pealegi saame pistikprogrammide abil oma makrosid deklareerida, aga see on juba teine ​​lugu.

Lisaks eelmääratletud asjadele saame asendada oma muutujate väärtused (kasutasin seda juba ülalolevas koodis). Loome sisse Admin/Variables paar asja:

Apache Airflow: ETL-i lihtsustamine

Kõik, mida saate kasutada:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Väärtus võib olla skalaar või ka JSON. JSON-i puhul:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

kasutage lihtsalt soovitud võtme teed: {{ var.json.bot_config.bot.token }}.

Ütlen sõna otseses mõttes ühe sõna ja näitan selle kohta üht ekraanipilti ühendused. Siin on kõik elementaarne: lehel Admin/Connections loome ühenduse, lisame sinna oma sisselogimised/paroolid ja täpsemad parameetrid. Nagu nii:

Apache Airflow: ETL-i lihtsustamine

Paroole saab krüpteerida (vaikimisi põhjalikumalt) või ühenduse tüübi välja jätta (nagu ma tegin tg_main) - tõsiasi on see, et tüüpide loend on Airflow mudelites kõvasti ühendatud ja seda ei saa laiendada ilma lähtekoodidesse sisenemata (kui ma äkki midagi googeldades ei leidnud, parandage mind), kuid miski ei takista meil krediite saamast. nimi.

Sama nimega saate luua ka mitu ühendust: antud juhul meetod BaseHook.get_connection(), mis toob meile nimepidi ühendused, annab juhuslik mitmelt nimekaimult (loogilisem oleks teha Round Robin, aga jätame selle Airflow arendajate südametunnistusele).

Muutujad ja ühendused on kindlasti lahedad tööriistad, kuid oluline on mitte kaotada tasakaalu: milliseid oma voogude osi salvestate koodi endasse ja millised osad annate Airflow'le talletamiseks. Ühest küljest võib kasutajaliidese kaudu olla mugav väärtust, näiteks postkasti, kiiresti muuta. Teisest küljest on see ikkagi tagasipöördumine hiireklõpsu juurde, millest me (mina) tahtsime vabaneda.

Töö seostega on üks ülesandeid konksud. Üldiselt on Airflow konksud punktid selle ühendamiseks kolmandate osapoolte teenuste ja raamatukogudega. Nt, JiraHook avab meile Jiraga suhtlemiseks kliendi (saate ülesandeid edasi-tagasi liigutada) ja abiga SambaHook saate kohaliku faili edasi lükata smb-punkt.

Kohandatud operaatori sõelumine

Ja jõudsime selle valmistamise uurimisele lähedale TelegramBotSendMessage

Kood commons/operators.py tegeliku operaatoriga:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Siin, nagu kõik muu Airflow'is, on kõik väga lihtne:

  • Päritud alates BaseOperator, mis rakendab üsna palju Airflow-spetsiifilisi asju (vaadake oma vaba aega)
  • Deklareeritud väljad template_fields, milles Jinja otsib töötlemiseks makrosid.
  • Korraldati õiged argumendid poolt __init__(), määrake vajadusel vaikeseaded.
  • Me ei unustanud ka esivanema initsialiseerimist.
  • Avas vastava konksu TelegramBotHooksai sellelt kliendiobjekti.
  • Alistatud (uuesti määratletud) meetod BaseOperator.execute(), mida Airfow tõmbleb, kui saabub aeg operaatori käivitamiseks - selles teostame põhitoimingu, unustades sisse logida. (Logime sisse, muide, kohe sisse stdout и stderr - Õhuvool püüab kõik kinni, mähib selle ilusti kokku, vajadusel lagundab.)

Vaatame, mis meil on commons/hooks.py. Faili esimene osa koos konksuga:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ma isegi ei tea, mida siin seletada, märgin lihtsalt olulised punktid:

  • Me pärime, mõtleme argumentidele - enamikul juhtudel on see üks: conn_id;
  • Standardmeetodite ülekaal: piirasin ennast get_conn(), milles saan ühenduse parameetrid nime järgi ja saan lihtsalt jaotise extra (see on JSON-väli), kuhu panin (vastavalt enda juhistele!) Telegrami roboti märgi: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Loon meie näite TelegramBot, andes sellele konkreetse märgi.

See on kõik. Kliendi saad konksust kasutades TelegramBotHook().clent või TelegramBotHook().get_conn().

Ja faili teine ​​osa, milles teen Telegram REST API jaoks mikroümbrise, et mitte sama lohistada python-telegram-bot ühe meetodi jaoks sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Õige viis on see kõik kokku liita: TelegramBotSendMessage, TelegramBotHook, TelegramBot - lisage pistikprogrammi avalikku hoidlasse ja andke see avatud lähtekoodile.

Sel ajal, kui me seda kõike uurisime, õnnestus meie aruannete värskendused edukalt ebaõnnestuda ja saatsid mulle kanalis veateate. Ma lähen kontrollin, kas see on valesti...

Apache Airflow: ETL-i lihtsustamine
Midagi läks meie dogis katki! Kas see pole see, mida me ootasime? Täpselt nii!

Kas kavatsete valada?

Kas sa tunned, et jäin millestki ilma? Tundub, et ta lubas andmed SQL Serverist Verticasse üle kanda ja siis võttis kätte ja läks teemast kõrvale, kelm!

See julmus oli tahtlik, ma lihtsalt pidin teie jaoks terminoloogiat lahti mõtestama. Nüüd saate minna kaugemale.

Meie plaan oli selline:

  1. Do dag
  2. Loo ülesandeid
  3. Vaadake, kui ilus kõik on
  4. Määrake täidistele seansinumbrid
  5. Hankige andmeid SQL Serverist
  6. Sisestage andmed Verticasse
  7. Koguge statistikat

Nii et selle kõige käivitamiseks tegin meie jaoks väikese täienduse docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Seal tõstame:

  • Vertica hostina dwh kõige vaikeseadetega,
  • kolm SQL Serveri eksemplari,
  • täidame viimases olevad andmebaasid teatud andmetega (ärge mingil juhul uurige mssql_init.py!)

Käivitame kõik hea eelmisest veidi keerulisema käsu abil:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Seda, mida meie imeline randomiseerija genereeris, saate seda üksust kasutada Data Profiling/Ad Hoc Query:

Apache Airflow: ETL-i lihtsustamine
Peaasi, et seda analüütikutele ei näidata

täpsustada ETL seansid Ma ei tee seda, kõik on seal triviaalne: teeme aluse, selles on silt, mähime kõik kontekstihalduriga ja nüüd teeme seda:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Aeg on tulnud koguda meie andmeid meie poolteisesajast lauast. Teeme seda väga tagasihoidlike ridade abil:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Konksu abil saame Airflowst pymssql- ühendada
  2. Asendame päringus piirangu kuupäeva kujul – mallimootor viskab selle funktsiooni.
  3. Toidame meie palvet pandaskes meid saab DataFrame - see on meile tulevikus kasulik.

Ma kasutan asendust {dt} päringu parameetri asemel %s mitte sellepärast, et ma oleks kuri Pinocchio, vaid sellepärast pandas ei saa hakkama pymssql ja libiseb viimase params: Listkuigi ta tõesti tahab tuple.
Pange tähele ka seda, et arendaja pymssql otsustas teda enam mitte toetada ja on aeg välja kolida pyodbc.

Vaatame, millega Airflow meie funktsioonide argumente toppis:

Apache Airflow: ETL-i lihtsustamine

Kui andmeid pole, siis pole mõtet ka jätkata. Kuid imelik on ka täidist õnnestunuks pidada. Kuid see pole viga. A-ah-ah, mida teha?! Ja siin on see, mis:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ütleb Airflow'le, et vigu pole, kuid jätame ülesande vahele. Liidesel ei ole rohelist ega punast ruutu, vaid roosa.

Loosime oma andmed mitu veergu:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nimelt

  • Andmebaas, kust tellimusi võtsime,
  • Meie üleujutuse seansi ID (see on erinev iga ülesande jaoks),
  • Lähte ja tellimuse ID räsi – et lõplikus andmebaasis (kus kõik ühte tabelisse valatakse) oleks meil kordumatu tellimuse ID.

Jääb eelviimane samm: valage kõik Verticasse. Kummalisel kombel on üks silmapaistvamaid ja tõhusamaid viise seda teha CSV kaudu!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Valmistame spetsiaalse vastuvõtja StringIO.
  2. pandas paneb lahkesti meie DataFrame nagu CSV- read.
  3. Avame konksuga ühenduse meie lemmik Verticaga.
  4. Ja nüüd abiga copy() saatke meie andmed otse Vertikale!

Võtame juhilt, mitu rida täitusid, ja ütleme seansihaldurile, et kõik on korras:

session.loaded_rows = cursor.rowcount
session.successful = True

See on kõik.

Müügil loome sihtplaadi käsitsi. Siin lubasin endale väikese masina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ma kasutan VerticaOperator() Loon andmebaasi skeemi ja tabeli (kui neid muidugi veel ei ole). Peaasi on sõltuvused õigesti korraldada:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Kokkuvõtteks

- Noh, - ütles hiireke, - kas pole nüüd
Kas olete veendunud, et ma olen metsa kõige kohutavam loom?

Julia Donaldson, Gruffalo

Ma arvan, et kui minu kolleegidega oleks konkurents: kes loob ja käivitab kiiresti nullist ETL-i protsessi: nemad oma SSIS-i ja hiirega ja mina Airflowga ... Ja siis võrdleksime ka hoolduse lihtsust ... Vau, ma arvan, et nõustute sellega, et võidan neid igal rindel!

Kui veidi tõsisemalt, siis Apache Airflow - kirjeldades protsesse programmikoodi kujul - tegi minu töö ära palju mugavam ja nauditavam.

Selle piiramatu laiendatavus nii pistikprogrammide kui ka skaleeritavuse osas annab teile võimaluse kasutada Airflow'i peaaegu igas valdkonnas: isegi kogu andmete kogumise, ettevalmistamise ja töötlemise tsüklis, isegi rakettide väljalaskmisel (Marsile, muidugi).

Osa lõpp, viide ja teave

Reha, mille oleme teile kogunud

  • start_date. Jah, see on juba kohalik meem. Dougi peamine argument start_date kõik läbivad. Lühidalt, kui täpsustate start_date praegune kuupäev ja schedule_interval - ühel päeval, siis DAG alustab homme mitte varem.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ja pole enam probleeme.

    Sellega on seotud veel üks käitusaegne viga: Task is missing the start_date parameter, mis viitab enamasti sellele, et unustasite dag operaatoriga siduda.

  • Kõik ühes masinas. Jah, ja alused (Airflow ise ja meie kate) ja veebiserver ja planeerija ja töötajad. Ja see isegi töötas. Kuid aja jooksul kasvas teenuste ülesannete arv ja kui PostgreSQL hakkas indeksile reageerima 20 ms asemel 5 sekundiga, võtsime selle ja viisime selle minema.
  • Kohalik täitja. Jah, me istume endiselt sellel ja oleme juba jõudnud kuristiku servale. Meile on seni piisanud LocalExecutorist, kuid nüüd on aeg laieneda vähemalt ühe töötajaga ja peame CeleryExecutorisse kolimiseks kõvasti tööd tegema. Ja arvestades asjaolu, et saate sellega töötada ühes masinas, ei takista miski teid kasutamast sellerit isegi serveris, mis "loomulikult ei lähe kunagi tootmisse, ausalt!"
  • Mittekasutamine sisseehitatud tööriistad:
    • Side teenindusmandaatide salvestamiseks,
    • SLA preilid vastata ülesannetele, mis ei õnnestunud õigel ajal,
    • xcom metaandmete vahetamiseks (ma ütlesin metaandmed!) dag ülesannete vahel.
  • Meili kuritarvitamine. No mis ma oskan öelda? Kõigi langenud ülesannete korduste kohta seati märguanded. Nüüd on minu töö Gmailil Airflow'st üle 90 100 meili ja veebimeilikork keeldub korraga üle XNUMX meili üles võtmast ja kustutamast.

Veel lõkse: Apache Airflow Pitfailid

Rohkem automatiseerimistööriistu

Selleks, et saaksime veelgi rohkem töötada oma peaga, mitte kätega, on Airflow meile ette valmistanud järgmise:

  • REST API - tal on endiselt Eksperimendi staatus, mis ei takista tal töötamast. Sellega saate mitte ainult hankida teavet päevade ja ülesannete kohta, vaid ka peatada/käivitada päevakava, luua DAG Run või basseini.
  • CLI - käsurea kaudu on saadaval palju tööriistu, mida ei ole WebUI kaudu lihtsalt ebamugav kasutada, vaid need üldiselt puuduvad. Näiteks:
    • backfill vaja tegumijuhtumite taaskäivitamiseks.
      Näiteks tulid analüütikud ja ütlesid: “Ja teil, seltsimees, on 1.-13. jaanuari andmetes lollus! Parandage, parandage, parandage, parandage!" Ja sa oled selline pliidiplaat:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Põhiteenus: initdb, resetdb, upgradedb, checkdb.
    • run, mis võimaldab teil käivitada ühe eksemplari ülesande ja isegi hinnata kõiki sõltuvusi. Lisaks saate seda käivitada LocalExecutor, isegi kui teil on selleriklaster.
    • Teeb enam-vähem sama asja test, ainult ka alustesse ei kirjuta midagi.
    • connections võimaldab massilist ühenduste loomist kestast.
  • Pythoni API - üsna kõva suhtlemisviis, mis on mõeldud pistikprogrammidele, mitte kubisema selles väikeste kätega. Aga kes takistab meid minemast /home/airflow/dags, jookse ipython ja hakata jamama? Näiteks saate eksportida kõik ühendused järgmise koodiga:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Ühenduse loomine Airflow metaandmebaasiga. Ma ei soovita sellele kirjutada, kuid ülesande olekute hankimine erinevate konkreetsete mõõdikute jaoks võib olla palju kiirem ja lihtsam kui mis tahes API kasutamine.

    Oletame, et kõik meie ülesanded ei ole idempotentsed, kuid mõnikord võivad need kukkuda ja see on normaalne. Aga paar ummistust on juba kahtlased ja oleks vaja kontrollida.

    Ettevaatust SQL-iga!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Viited

Ja loomulikult on Google'i väljastamise esimesed kümme linki minu järjehoidjate kausta Airflow sisu.

Ja artiklis kasutatud lingid:

Allikas: www.habr.com

Ostke DDoS-kaitsega saitide jaoks usaldusväärne hostimine, VPS VDS-serverid 🔥 Osta usaldusväärne veebimajutus DDoS-kaitsega, VPS VDS serverid | ProHoster