Apache-lugvloei: maak ETL makliker

Hallo, ek is Dmitry Logvinenko - Data-ingenieur van die Analytics-afdeling van die Vezet-groep van maatskappye.

Ek sal jou vertel van 'n wonderlike hulpmiddel vir die ontwikkeling van ETL-prosesse - Apache Airflow. Maar Airflow is so veelsydig en veelsydig dat jy dit van nader moet bekyk, selfs al is jy nie by datavloei betrokke nie, maar het jy 'n behoefte om periodiek enige prosesse te begin en die uitvoering daarvan te monitor.

En ja, ek sal nie net vertel nie, maar ook wys: die program het baie kode, skermkiekies en aanbevelings.

Apache-lugvloei: maak ETL makliker
Wat jy gewoonlik sien as jy die woord Airflow / Wikimedia Commons google

Inhoudsopgawe

Inleiding

Apache Airflow is net soos Django:

  • in luislang geskryf
  • daar is 'n wonderlike administrasiepaneel,
  • onbepaald uitbrei

- net beter, en dit is gemaak vir heeltemal ander doeleindes, naamlik (soos dit voor die kata geskryf is):

  • uitvoer en monitering van take op 'n onbeperkte aantal masjiene (soos baie Seldery / Kubernetes en jou gewete jou sal toelaat)
  • met dinamiese werkvloeigenerering van baie maklik om Python-kode te skryf en te verstaan
  • en die vermoë om enige databasisse en API's met mekaar te verbind deur beide klaargemaakte komponente en tuisgemaakte plugins (wat uiters eenvoudig is).

Ons gebruik Apache Airflow soos volg:

  • ons samel data van verskeie bronne in (baie SQL Server- en PostgreSQL-gevalle, verskeie API's met toepassingsstatistieke, selfs 1C) in DWH en ODS (ons het Vertica en Clickhouse).
  • hoe gevorderd cron, wat die datakonsolidasieprosesse op die ODS begin, en ook hul instandhouding monitor.

Tot onlangs was ons behoeftes gedek deur een klein bediener met 32 ​​kerne en 50 GB RAM. In Airflow werk dit:

  • meer 200 dae (eintlik werkstrome, waarin ons take gevul het),
  • in elk gemiddeld 70 take,
  • hierdie goedheid begin (ook gemiddeld) een keer per uur.

En oor hoe ons uitgebrei het, sal ek hieronder skryf, maar kom ons definieer nou die über-probleem wat ons sal oplos:

Daar is drie oorspronklike SQL Servers, elk met 50 databasisse - gevalle van een projek, onderskeidelik, hulle het dieselfde struktuur (byna oral, mua-ha-ha), wat beteken dat elkeen 'n Orders-tabel het (gelukkig 'n tabel met daardie naam kan in enige besigheid ingedruk word). Ons neem die data deur diensvelde by te voeg (bronbediener, brondatabasis, ETL-taak-ID) en gooi dit naïef in, sê, Vertica.

Kom ons gaan!

Die hoofgedeelte, prakties (en 'n bietjie teoreties)

Hoekom doen ons (en jy)

Toe die bome groot was en ek eenvoudig was SQL-schik in een Russiese kleinhandel, ons het ETL-prosesse, oftewel datavloei, bedrieg deur twee gereedskap wat vir ons beskikbaar is:

  • Informatica Power Center - 'n uiters verspreide stelsel, uiters produktief, met sy eie hardeware, sy eie weergawe. Ek het God verhoede 1% van sy vermoëns gebruik. Hoekom? Wel, eerstens het hierdie koppelvlak, iewers uit die 380's, geestelik druk op ons geplaas. Tweedens, hierdie kontrepsie is ontwerp vir uiters spoggerige prosesse, woedende komponent-hergebruik en ander baie-belangrike ondernemingstruuks. Oor wat dit kos, soos die vlerk van die Airbus AXNUMX / jaar, sal ons niks sê nie.

    Pasop, 'n kiekie kan mense onder 30 'n bietjie seermaak

    Apache-lugvloei: maak ETL makliker

  • SQL Server Integrasie Bediener - ons het hierdie kameraad in ons intra-projekvloei gebruik. Wel, in werklikheid: ons gebruik reeds SQL Server, en dit sal op een of ander manier onredelik wees om nie sy ETL-nutsmiddels te gebruik nie. Alles daarin is goed: beide die koppelvlak is pragtig en die vorderingsverslae ... Maar dit is nie hoekom ons van sagtewareprodukte hou nie, o, nie hiervoor nie. Weergawe dit dtsx (wat XML is met nodes wat op stoor geskommel is) ons kan, maar wat is die punt? Hoe gaan dit met die maak van 'n taakpakket wat honderde tafels van een bediener na 'n ander sal sleep? Ja, wat 'n honderd, jou wysvinger sal van twintig stukke afval, deur op die muisknoppie te klik. Maar dit lyk beslis meer modieus:

    Apache-lugvloei: maak ETL makliker

Ons het beslis na uitweg gesoek. Geval selfs byna het by 'n selfgeskrewe SSIS-pakketgenerator gekom ...

…en toe kry 'n nuwe werk my. En Apache Airflow het my daarop ingehaal.

Toe ek uitvind dat ETL-prosesbeskrywings eenvoudige Python-kode is, het ek net nie gedans van vreugde nie. Dit is hoe datastrome weergawe en verskil is, en om tabelle met 'n enkele struktuur uit honderde databasisse in een teiken te gooi, het 'n kwessie van Python-kode in een en 'n half of twee 13 "skerms geword.

Die samestelling van die groep

Kom ons reël nie 'n heeltemal kleuterskool nie, en praat nie hier oor heeltemal ooglopende dinge nie, soos die installering van Airflow, jou gekose databasis, Seldery en ander gevalle wat in die dokke beskryf word.

Sodat ons dadelik met eksperimente kan begin, het ek geskets docker-compose.yml waarin:

  • Kom ons verhoog eintlik Lugvloei: Skeduleerder, Webbediener. Blom sal ook daar draai om selderytake te monitor (omdat dit reeds ingedruk is apache/airflow:1.10.10-python3.7, maar ons gee nie om nie)
  • PostgreSQL, waarin Airflow sy diensinligting sal skryf (skeduleerderdata, uitvoeringstatistieke, ens.), en Selery sal voltooide take merk;
  • Redis, wat as 'n taakmakelaar vir Seldery sal optree;
  • Seldery werker, wat betrokke sal wees by die direkte uitvoering van take.
  • Na gids ./dags ons sal ons lêers byvoeg met die beskrywing van dags. Hulle sal dadelik opgetel word, so dit is nie nodig om die hele stapel na elke nies te jongleren nie.

Op sommige plekke word die kode in die voorbeelde nie heeltemal gewys nie (om nie die teks deurmekaar te maak nie), maar iewers word dit in die proses gewysig. Volledige werkende kode voorbeelde kan gevind word in die bewaarplek https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notas:

  • In die samestelling van die komposisie het ek grootliks op die bekende beeld staatgemaak pukkel/dok-lugvloei - maak seker dat u dit nagaan. Miskien het jy niks anders in jou lewe nodig nie.
  • Alle lugvloeiinstellings is nie net beskikbaar nie airflow.cfg, maar ook deur omgewingsveranderlikes (danksy die ontwikkelaars), wat ek kwaadwillig benut het.
  • Natuurlik is dit nie produksiegereed nie: ek het doelbewus nie hartklop op houers gesit nie, ek het my nie aan sekuriteit gesteur nie. Maar ek het die minimum gedoen wat geskik is vir ons eksperimenteerders.
  • Let daarop dat:
    • Die daglêer moet toeganklik wees vir beide die skeduleerder en die werkers.
    • Dieselfde geld vir alle derdeparty-biblioteke - hulle moet almal op masjiene met 'n skeduleerder en werkers geïnstalleer word.

Wel, nou is dit eenvoudig:

$ docker-compose up --scale worker=3

Nadat alles gestyg het, kan u na die webkoppelvlakke kyk:

Basiese konsepte

As jy niks in al hierdie "dae" verstaan ​​het nie, dan is hier 'n kort woordeboek:

  • Skeduleerder - die belangrikste oom in Airflow, wat beheer dat robotte hard werk, en nie 'n persoon nie: monitor die skedule, werk dag op, loods take.

    Oor die algemeen, in ouer weergawes, het hy probleme met geheue gehad (nee, nie geheueverlies nie, maar lekkasies) en die nalatenskapparameter het selfs in die konfigurasies gebly run_duration - sy herbegin interval. Maar nou is alles reg.

  • DAG (aka "dag") - "gerigte asikliese grafiek", maar so 'n definisie sal min mense vertel, maar in werklikheid is dit 'n houer vir take wat met mekaar in wisselwerking is (sien hieronder) of 'n analoog van Package in SSIS en Workflow in Informatica .

    Benewens daggies kan daar nog subdags wees, maar ons sal heel waarskynlik nie daarby uitkom nie.

  • DAG hardloop - geïnisialiseer dag, wat sy eie toegeken word execution_date. Dagrans van dieselfde dag kan parallel werk (as jy jou take natuurlik idempotent gemaak het).
  • operateur is stukke kode wat verantwoordelik is vir die uitvoering van 'n spesifieke aksie. Daar is drie tipes operateurs:
    • aksiesoos ons gunsteling PythonOperator, wat enige (geldige) Python-kode kan uitvoer;
    • oordra, wat data van plek tot plek vervoer, sê, MsSqlToHiveTransfer;
    • sensor aan die ander kant sal dit jou toelaat om te reageer of die verdere uitvoering van die dag te vertraag totdat 'n gebeurtenis plaasvind. HttpSensor kan die gespesifiseerde eindpunt trek, en wanneer die verlangde reaksie wag, begin die oordrag GoogleCloudStorageToS3Operator. ’n Nuuskierige gees sal vra: “hoekom? Jy kan immers herhalings reg in die operateur doen!” En dan, om nie die poel take met geskorste operateurs te verstop nie. Die sensor begin, kontroleer en sterf voor die volgende poging.
  • Taak - verklaarde operateurs, ongeag tipe, en verbonde aan die dag word bevorder tot die rang van taak.
  • taak instansie - toe die algemene beplanner besluit het dat dit tyd is om take in die stryd teen presterende-werkers te stuur (reg op die plek, as ons gebruik LocalExecutor of na 'n afgeleë nodus in die geval van CeleryExecutor), ken dit 'n konteks aan hulle toe (d.w.s. 'n stel veranderlikes - uitvoeringsparameters), brei opdrag- of navraagsjablone uit en voeg dit saam.

Ons genereer take

Laat ons eers die algemene skema van ons doug uiteensit, en dan sal ons meer en meer in die besonderhede duik, want ons pas 'n paar nie-triviale oplossings toe.

So, in sy eenvoudigste vorm, sal so 'n dag soos volg lyk:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Kom ons vind dit uit:

  • Eerstens voer ons die nodige libs in en iets anders;
  • sql_server_ds - Is List[namedtuple[str, str]] met die name van die verbindings van Airflow Connections en die databasisse waaruit ons ons bord sal neem;
  • dag - die aankondiging van ons dag, wat noodwendig in moet wees globals(), anders sal Airflow dit nie vind nie. Doug moet ook sê:
    • wat is sy naam orders - hierdie naam sal dan in die webkoppelvlak verskyn,
    • dat hy vanaf middernag op die agtste Julie sal werk,
    • en dit behoort ongeveer elke 6 uur te loop (vir geharde ouens hier in plaas van timedelta() toelaatbaar cron-lyn 0 0 0/6 ? * * *, vir die minder cool - 'n uitdrukking soos @daily);
  • workflow() sal die hoofwerk doen, maar nie nou nie. Vir eers sal ons net ons konteks in die log gooi.
  • En nou die eenvoudige magie van die skep van take:
    • ons hardloop deur ons bronne;
    • inisialiseer PythonOperator, wat ons dummy sal teregstel workflow(). Moenie vergeet om 'n unieke (binne die dag) naam van die taak te spesifiseer en die dag self te bind nie. Vlag provide_context op sy beurt sal addisionele argumente in die funksie gooi, wat ons versigtig sal versamel deur gebruik te maak **context.

Vir nou is dit al. Wat ons gekry het:

  • nuwe dag in die webkoppelvlak,
  • een en 'n half honderd take wat parallel uitgevoer sal word (as die lugvloei, seldery-instellings en bedienerkapasiteit dit toelaat).

Wel, amper het dit.

Apache-lugvloei: maak ETL makliker
Wie sal die afhanklikhede installeer?

Om hierdie hele ding te vereenvoudig, het ek ingeskroef docker-compose.yml verwerking requirements.txt op alle nodusse.

Nou is dit weg:

Apache-lugvloei: maak ETL makliker

Grys ​​blokkies is taakgevalle wat deur die skeduleerder verwerk word.

Ons wag 'n bietjie, die take word opgeraap deur die werkers:

Apache-lugvloei: maak ETL makliker

Die groenes het natuurlik hul werk suksesvol voltooi. Rooies is nie baie suksesvol nie.

Terloops, daar is geen gids op ons produk nie ./dags, daar is geen sinchronisasie tussen masjiene nie - alle dae lê in git op ons Gitlab, en Gitlab CI versprei opdaterings na masjiene wanneer hulle saamsmelt master.

'n Bietjie oor Blom

Terwyl die werkers ons fopspeen slaan, laat ons nog 'n hulpmiddel onthou wat vir ons iets kan wys - Blom.

Die heel eerste bladsy met opsommende inligting oor werker nodusse:

Apache-lugvloei: maak ETL makliker

Die mees intense bladsy met take wat te werk gegaan het:

Apache-lugvloei: maak ETL makliker

Die verveligste bladsy met die status van ons makelaar:

Apache-lugvloei: maak ETL makliker

Die helderste bladsy is met taakstatusgrafieke en hul uitvoeringstyd:

Apache-lugvloei: maak ETL makliker

Ons laai die onderlaaide

So, al die take het uitgewerk, jy kan die gewondes wegdra.

Apache-lugvloei: maak ETL makliker

En daar was baie gewondes – om een ​​of ander rede. In die geval van die korrekte gebruik van Airflow, dui hierdie einste blokkies aan dat die data beslis nie opgedaag het nie.

Jy moet die log dophou en die gevalle taakgevalle weer begin.

Deur op enige vierkant te klik, sal ons die aksies wat vir ons beskikbaar is, sien:

Apache-lugvloei: maak ETL makliker

Jy kan die gevallenes neem en skoonmaak. Dit wil sê, ons vergeet dat iets daar misluk het, en dieselfde voorbeeldtaak sal na die skeduleerder gaan.

Apache-lugvloei: maak ETL makliker

Dit is duidelik dat dit nie baie menslik is om dit met die muis met al die rooi blokkies te doen nie – dit is nie wat ons van Airflow verwag nie. Natuurlik het ons massavernietigingswapens: Browse/Task Instances

Apache-lugvloei: maak ETL makliker

Kom ons kies alles gelyktydig en herstel na nul, klik op die korrekte item:

Apache-lugvloei: maak ETL makliker

Na skoonmaak lyk ons ​​taxi's so (hulle wag reeds vir die skeduleerder om hulle te skeduleer):

Apache-lugvloei: maak ETL makliker

Verbindings, hake en ander veranderlikes

Dit is tyd om na die volgende DAG te kyk, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Het almal al ooit 'n verslagopdatering gedoen? Dit is weer sy: daar is 'n lys van bronne van waar om die data te kry; daar is 'n lys waar om te plaas; moenie vergeet om te toeter wanneer alles gebeur of gebreek het nie (wel, dit gaan nie oor ons nie, nee).

Kom ons gaan weer deur die lêer en kyk na die nuwe obskure goed:

  • from commons.operators import TelegramBotSendMessage - niks verhinder ons om ons eie operateurs te maak nie, wat ons benut het deur 'n klein omhulsel te maak om boodskappe na Unblocked te stuur. (Ons sal hieronder meer oor hierdie operateur praat);
  • default_args={} - dag kan dieselfde argumente aan al sy operateurs versprei;
  • to='{{ var.value.all_the_kings_men }}' - veld to ons sal nie hardkodeer hê nie, maar dinamies gegenereer met behulp van Jinja en 'n veranderlike met 'n lys e-posse, wat ek versigtig ingesit het Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — voorwaarde vir die aanvang van die operateur. In ons geval sal die brief net na die base vlieg as alle afhanklikhede uitgewerk het suksesvol;
  • tg_bot_conn_id='tg_main' - argumente conn_id aanvaar verbindings-ID's wat ons skep Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - boodskappe in Telegram sal net wegvlieg as daar gevalle take is;
  • task_concurrency=1 - ons verbied die gelyktydige bekendstelling van verskeie taakgevalle van een taak. Andersins kry ons die gelyktydige bekendstelling van verskeie VerticaOperator (kyk na een tafel);
  • report_update >> [email, tg] - almal VerticaOperator konvergeer in die stuur van briewe en boodskappe, soos hierdie:
    Apache-lugvloei: maak ETL makliker

    Maar aangesien kennisgewingsoperateurs verskillende bekendstellingstoestande het, sal slegs een werk. In die boomaansig lyk alles 'n bietjie minder visueel:
    Apache-lugvloei: maak ETL makliker

Ek sal 'n paar woorde sê oor makro's en hul vriende - veranderlikes.

Makro's is Jinja-plekhouers wat verskeie nuttige inligting in operateurargumente kan vervang. Byvoorbeeld, soos volg:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} sal uitbrei na die inhoud van die konteksveranderlike execution_date in die formaat YYYY-MM-DD: 2020-07-14. Die beste deel is dat konteksveranderlikes aan 'n spesifieke taakgeval vasgespyker word ('n vierkant in die boomaansig), en wanneer dit herbegin word, sal die plekhouers na dieselfde waardes uitbrei.

Die toegekende waardes kan bekyk word met die Weergegee-knoppie op elke taakgeval. Dit is hoe die taak om 'n brief te stuur:

Apache-lugvloei: maak ETL makliker

En so by die taak met die stuur van 'n boodskap:

Apache-lugvloei: maak ETL makliker

'n Volledige lys van ingeboude makro's vir die nuutste beskikbare weergawe is hier beskikbaar: makros verwysing

Boonop kan ons met behulp van plugins ons eie makro's verklaar, maar dit is 'n ander storie.

Benewens die vooraf gedefinieerde dinge, kan ons die waardes van ons veranderlikes vervang (ek het dit reeds in die kode hierbo gebruik). Kom ons skep in Admin/Variables 'n paar dinge:

Apache-lugvloei: maak ETL makliker

Alles wat jy kan gebruik:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Die waarde kan 'n skalaar wees, of dit kan ook JSON wees. In die geval van JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

gebruik net die pad na die verlangde sleutel: {{ var.json.bot_config.bot.token }}.

Ek sal letterlik een woord sê en een kiekie oor wys verbindings. Alles is elementêr hier: op die bladsy Admin/Connections ons skep 'n verbinding, voeg ons logins / wagwoorde en meer spesifieke parameters daar by. Soos hierdie:

Apache-lugvloei: maak ETL makliker

Wagwoorde kan geïnkripteer word (meer deeglik as die verstek), of jy kan die verbindingstipe weglaat (soos ek gedoen het vir tg_main) - die feit is dat die lys tipes hardbedraad is in Airflow-modelle en nie uitgebrei kan word sonder om in die bronkodes in te gaan nie (as ek skielik nie iets gegoogle het nie, korrigeer my asseblief), maar niks sal ons keer om krediete te kry net deur naam.

U kan ook verskeie verbindings met dieselfde naam maak: in hierdie geval die metode BaseHook.get_connection(), wat ons verbindings by die naam kry, sal gee ewekansig van verskeie naamgenote (dit sal meer logies wees om Round Robin te maak, maar kom ons los dit op die gewete van die Airflow-ontwikkelaars).

Veranderlikes en verbindings is beslis oulike hulpmiddels, maar dit is belangrik om nie die balans te verloor nie: watter dele van jou vloeie stoor jy in die kode self, en watter dele gee jy aan Airflow vir berging. Aan die een kant kan dit gerieflik wees om die waarde vinnig te verander, byvoorbeeld 'n posbus, deur die UI. Aan die ander kant is dit steeds 'n terugkeer na die muisklik, waarvan ons (ek) ontslae wou raak.

Om met verbindings te werk is een van die take hake. Oor die algemeen is Airflow-hake punte om dit aan derdeparty-dienste en biblioteke te koppel. Bv. JiraHook sal 'n kliënt vir ons oopmaak om met Jira te kommunikeer (jy kan take heen en weer skuif), en met behulp van SambaHook jy kan 'n plaaslike lêer na stoot smb-punt.

Ontleding van die pasgemaakte operateur

En ons het naby gekom om te kyk hoe dit gemaak word TelegramBotSendMessage

Kode commons/operators.py met die werklike operateur:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hier, soos alles anders in Airflow, is alles baie eenvoudig:

  • Geërf van BaseOperator, wat 'n hele paar Airflow-spesifieke dinge implementeer (kyk na jou ontspanning)
  • Verklaarde velde template_fields, waarin Jinja na makro's sal soek om te verwerk.
  • Die regte argumente gereël vir __init__(), stel die verstekwaardes in waar nodig.
  • Ons het ook nie vergeet van die inisialisering van die voorvader nie.
  • Het die ooreenstemmende haak oopgemaak TelegramBotHook'n kliëntobjek daaruit ontvang.
  • Veranderde (herdefinieer) metode BaseOperator.execute(), wat Airfow sal draai wanneer die tyd aanbreek om die operateur te begin - daarin sal ons die hoofaksie implementeer, en vergeet om aan te meld. (Ons meld terloops dadelik aan stdout и stderr - Lugvloei sal alles onderskep, dit pragtig toevou, dit ontbind waar nodig.)

Kom ons kyk wat ons het commons/hooks.py. Die eerste deel van die lêer, met die haak self:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ek weet nie eers wat om hier te verduidelik nie, ek let net op die belangrike punte:

  • Ons erf, dink oor die argumente - in die meeste gevalle sal dit een wees: conn_id;
  • Oorheersende standaardmetodes: Ek het myself beperk get_conn(), waarin ek die verbindingsparameters by naam kry en net die afdeling kry extra (dit is 'n JSON-veld), waarin ek (volgens my eie instruksies!) die Telegram-bottoken plaas: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ek skep 'n voorbeeld van ons TelegramBot, gee dit 'n spesifieke teken.

Dis al. Jy kan 'n kliënt uit 'n haak met behulp van TelegramBotHook().clent of TelegramBotHook().get_conn().

En die tweede deel van die lêer, waarin ek 'n mikrowrapper vir die Telegram REST API maak, om nie dieselfde te sleep nie python-telegram-bot vir een metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Die korrekte manier is om dit alles bymekaar te tel: TelegramBotSendMessage, TelegramBotHook, TelegramBot - in die inprop, plaas 'n publieke bewaarplek en gee dit aan Open Source.

Terwyl ons dit alles bestudeer het, het ons verslagopdaterings daarin geslaag om suksesvol te misluk en vir my 'n foutboodskap in die kanaal te stuur. Ek gaan kyk of dit verkeerd is...

Apache-lugvloei: maak ETL makliker
Iets het in ons doge gebreek! Is dit nie wat ons verwag het nie? Presies!

Gaan jy skink?

Voel jy ek het iets gemis? Dit blyk dat hy belowe het om data van SQL Server na Vertica oor te dra, en toe neem hy dit en skuif van die onderwerp af, die skelm!

Hierdie gruweldaad was opsetlik, ek moes bloot 'n paar terminologie vir jou ontsyfer. Nou kan jy verder gaan.

Ons plan was dit:

  1. Doen dag
  2. Genereer take
  3. Kyk hoe mooi is alles
  4. Ken sessienommers toe aan vullings
  5. Kry data vanaf SQL Server
  6. Plaas data in Vertica
  7. Versamel statistieke

Dus, om dit alles aan die gang te kry, het ek 'n klein toevoeging tot ons docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Daar lig ons op:

  • Vertika as gasheer dwh met die mees verstek instellings,
  • drie gevalle van SQL Server,
  • ons vul die databasisse in laasgenoemde met sommige data (moet in geen geval nie na kyk nie mssql_init.py!)

Ons begin al die goeie met behulp van 'n effens meer ingewikkelde opdrag as die vorige keer:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Wat ons wonderwerk-randomizer gegenereer het, kan jy die item gebruik Data Profiling/Ad Hoc Query:

Apache-lugvloei: maak ETL makliker
Die belangrikste ding is om dit nie aan ontleders te wys nie

uitbrei oor ETL sessies Ek sal nie, alles is onbenullig daar: ons maak 'n basis, daar is 'n teken daarin, ons draai alles toe met 'n konteksbestuurder, en nou doen ons dit:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Die tyd het aangebreek versamel ons data van ons een en 'n half honderd tafels. Kom ons doen dit met behulp van baie onpretensieuse lyne:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Met die hulp van 'n haak kry ons van Airflow pymssql- verbind
  2. Kom ons plaas 'n beperking in die vorm van 'n datum in die versoek - dit sal deur die sjabloonenjin in die funksie gegooi word.
  3. Voed ons versoek pandaswie sal ons kry DataFrame - dit sal vir ons nuttig wees in die toekoms.

Ek gebruik substitusie {dt} in plaas van 'n versoekparameter %s nie omdat ek 'n bose Pinocchio is nie, maar omdat pandas kan hanteer nie pymssql en glip die laaste een params: Listalhoewel hy regtig wil tuple.
Let ook daarop dat die ontwikkelaar pymssql besluit om hom nie meer te ondersteun nie, en dit is tyd om uit te trek pyodbc.

Kom ons kyk waarmee Airflow die argumente van ons funksies gevul het:

Apache-lugvloei: maak ETL makliker

As daar geen data is nie, is dit geen sin om voort te gaan nie. Maar dit is ook vreemd om die vulsel as suksesvol te beskou. Maar dit is nie 'n fout nie. A-ah-ah, wat om te doen?! En hier is wat:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException sê vir Airflow dat daar geen foute is nie, maar ons slaan die taak oor. Die koppelvlak sal nie 'n groen of rooi vierkant hê nie, maar pienk.

Kom ons gooi ons data veelvuldige kolomme:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Naamlik

  • Die databasis waaruit ons die bestellings geneem het,
  • ID van ons oorstromingsessie (dit sal anders wees vir elke taak),
  • 'n Hash van die bron en bestelling ID - sodat ons in die finale databasis (waar alles in een tabel gegooi word) 'n unieke bestelling ID het.

Die voorlaaste stap bly: gooi alles in Vertica. En, vreemd genoeg, is een van die skouspelagtigste en doeltreffendste maniere om dit te doen deur CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ons maak 'n spesiale ontvanger StringIO.
  2. pandas sal vriendelik plaas ons DataFrame in die vorm CSV-lyne.
  3. Kom ons maak 'n verbinding met ons gunsteling Vertica met 'n haak.
  4. En nou met die hulp copy() stuur ons data direk na Vertika!

Ons sal by die bestuurder neem hoeveel lyne gevul is, en vir die sessiebestuurder sê dat alles reg is:

session.loaded_rows = cursor.rowcount
session.successful = True

Dit is alles.

Op die uitverkoping skep ons die teikenplaat met die hand. Hier het ek myself 'n klein masjien toegelaat:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ek gebruik VerticaOperator() Ek skep 'n databasisskema en 'n tabel (as dit nie reeds bestaan ​​nie, natuurlik). Die belangrikste ding is om die afhanklikhede korrek te rangskik:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Opsomming

- Wel, - sê die muisie, - is dit nie nou nie
Is jy oortuig dat ek die aakligste dier in die bos is?

Julia Donaldson, The Gruffalo

Ek dink as ek en my kollegas 'n kompetisie gehad het: wie sal vinnig 'n ETL-proses van nuuts af skep en begin: hulle met hul SSIS en 'n muis en ek met Airflow ... En dan sou ons ook die gemak van onderhoud vergelyk ... Sjoe, ek dink jy sal saamstem dat ek hulle op alle fronte sal klop!

As 'n bietjie meer ernstig, dan het Apache Airflow - deur prosesse in die vorm van programkode te beskryf - my werk gedoen veel gemakliker en lekkerder.

Die onbeperkte uitbreidbaarheid daarvan, beide in terme van inproppe en geneigdheid tot skaalbaarheid, gee jou die geleentheid om Airflow in byna enige area te gebruik: selfs in die volle siklus van die insameling, voorbereiding en verwerking van data, selfs in die lansering van vuurpyle (na Mars, van kursus).

Deelfinaal, verwysing en inligting

Die hark wat ons vir jou ingesamel het

  • start_date. Ja, dit is reeds 'n plaaslike meme. Via Doug se hoofargument start_date almal slaag. Kortliks, as jy spesifiseer in start_date huidige datum, en schedule_interval - eendag, dan begin DAG môre nie vroeër nie.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    En nie meer probleme nie.

    Daar is nog 'n looptydfout wat daarmee verband hou: Task is missing the start_date parameter, wat meestal aandui dat jy vergeet het om aan die dagoperateur te bind.

  • Alles op een masjien. Ja, en basisse (Airflow self en ons coating), en 'n webbediener, en 'n skeduleerder, en werkers. En dit het selfs gewerk. Maar met verloop van tyd het die aantal take vir dienste gegroei, en toe PostgreSQL in 20 s in plaas van 5 ms op die indeks begin reageer het, het ons dit geneem en weggedra.
  • LocalExecutor. Ja, ons sit nog steeds daarop, en ons het reeds op die rand van die afgrond gekom. LocalExecutor was tot dusver genoeg vir ons, maar nou is dit tyd om met ten minste een werker uit te brei, en ons sal hard moet werk om na CeleryExecutor te skuif. En in die lig van die feit dat jy daarmee op een masjien kan werk, keer niks jou om Selery selfs op 'n bediener te gebruik nie, wat "natuurlik nooit in produksie sal gaan nie, eerlikwaar!"
  • Nie-gebruik ingeboude gereedskap:
    • Connections om diensbewyse te stoor,
    • SLA Mejuffroue om te reageer op take wat nie betyds uitgewerk het nie,
    • xcom vir metadata-uitruiling (ek het gesê metadata!) tussen dagtake.
  • Pos misbruik. Wel, wat kan ek sê? Waarskuwings is opgestel vir alle herhalings van gevalle take. Nou het my werk Gmail meer as 90 100 e-posse vanaf Airflow, en die webpos snuit weier om meer as XNUMX op 'n slag op te tel en uit te vee.

Nog slaggate: Apache-lugvloeiputte

Meer outomatiseringsinstrumente

Sodat ons selfs meer met ons koppe kan werk en nie met ons hande nie, het Airflow dit vir ons voorberei:

  • REST API - hy het steeds die status van Eksperimenteel, wat hom nie verhinder om te werk nie. Daarmee kan jy nie net inligting oor dag en take kry nie, maar ook 'n dag stop/begin, 'n DAG Run of 'n swembad skep.
  • CLI - Baie gereedskap is beskikbaar deur die opdragreël wat nie net ongerieflik is om deur die WebUI te gebruik nie, maar oor die algemeen afwesig is. Byvoorbeeld:
    • backfill nodig om taakgevalle te herbegin.
      Ontleders het byvoorbeeld kom sê: “En jy, kameraad, het nonsens in die data van 1 tot 13 Januarie! Maak dit reg, maak dit reg, maak dit reg, maak dit reg!" En jy is so 'n kookplaat:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basisdiens: initdb, resetdb, upgradedb, checkdb.
    • run, wat jou toelaat om een ​​instansie taak uit te voer, en selfs op alle afhanklikhede te score. Boonop kan u dit via LocalExecutor, selfs al het jy 'n Seldery-kluster.
    • Doen omtrent dieselfde ding test, net ook in basisse skryf niks.
    • connections laat massaskepping van verbindings vanaf die dop toe.
  • python api - 'n taamlik harde manier van interaksie, wat bedoel is vir plugins, en nie met klein handjies daarin swerm nie. Maar wie gaan ons keer om na te gaan /home/airflow/dags, hardloop ipython en begin rondmors? U kan byvoorbeeld alle verbindings met die volgende kode uitvoer:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Koppel aan die Airflow-metadatabasis. Ek beveel nie aan om daaraan te skryf nie, maar om taakstate vir verskeie spesifieke maatstawwe te kry, kan baie vinniger en makliker wees as om enige van die API's te gebruik.

    Kom ons sê dat nie al ons take idempotent is nie, maar hulle kan soms val, en dit is normaal. Maar 'n paar blokkasies is reeds verdag, en dit sal nodig wees om na te gaan.

    Pasop SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

verwysings

En natuurlik is die eerste tien skakels vanaf die uitreiking van Google die inhoud van die Airflow-lêergids van my boekmerke.

En die skakels wat in die artikel gebruik word:

Bron: will.com