Apache Airflow: ETL gemakkelijker maken

Hallo, ik ben Dmitry Logvinenko - Data Engineer van de afdeling Analytics van de Vezet-bedrijvengroep.

Ik zal je vertellen over een geweldige tool voor het ontwikkelen van ETL-processen - Apache Airflow. Maar Airflow is zo veelzijdig en veelzijdig dat u er beter naar moet kijken, ook als u niet betrokken bent bij gegevensstromen, maar periodiek processen moet starten en de uitvoering ervan moet controleren.

En ja, ik zal het niet alleen vertellen, maar ook laten zien: het programma heeft veel code, screenshots en aanbevelingen.

Apache Airflow: ETL gemakkelijker maken
Wat je meestal ziet als je het woord Airflow / Wikimedia Commons googelt

inhoudsopgave

Introductie

Apache Airflow is net als Django:

  • geschreven in python
  • er is een geweldig admin-paneel,
  • onbeperkt uitbreidbaar

- alleen beter, en het is gemaakt voor totaal andere doeleinden, namelijk (zoals het voor de kat staat):

  • taken uitvoeren en bewaken op een onbeperkt aantal machines (zoveel Celery / Kubernetes en uw geweten u toestaan)
  • met dynamische workflowgeneratie van zeer eenvoudig te schrijven en Python-code te begrijpen
  • en de mogelijkheid om alle databases en API's met elkaar te verbinden met behulp van zowel kant-en-klare componenten als zelfgemaakte plug-ins (wat uiterst eenvoudig is).

We gebruiken Apache Airflow als volgt:

  • we verzamelen gegevens uit verschillende bronnen (veel SQL Server- en PostgreSQL-instanties, verschillende API's met applicatiestatistieken, zelfs 1C) in DWH en ODS (we hebben Vertica en Clickhouse).
  • hoe geavanceerd cron, die de gegevensconsolidatieprocessen op de ODS start en ook het onderhoud ervan bewaakt.

Tot voor kort werden onze behoeften gedekt door één kleine server met 32 ​​kernen en 50 GB RAM. In Airflow werkt dit:

  • meer 200 dagen (eigenlijk workflows, waarin we taken propten),
  • in elk gemiddeld 70 taken,
  • deze goedheid begint (ook gemiddeld) een keer per uur.

En over hoe we uitbreidden, zal ik hieronder schrijven, maar laten we nu het über-probleem definiëren dat we zullen oplossen:

Er zijn drie bron-SQL-servers, elk met 50 databases - respectievelijk instanties van één project, ze hebben dezelfde structuur (bijna overal, mua-ha-ha), wat betekent dat elk een Orders-tabel heeft (gelukkig een tabel met die naam kan in elk bedrijf worden gepusht). We nemen de gegevens door servicevelden toe te voegen (bronserver, brondatabase, ETL-taak-ID) en gooien ze naïef in bijvoorbeeld Vertica.

Laten we gaan!

Het grootste deel, praktisch (en een beetje theoretisch)

Waarom doen wij (en jij)

Toen de bomen groot waren en ik simpel was SQL-schik in een Russische winkel, hebben we ETL-processen, oftewel gegevensstromen, opgelicht met behulp van twee tools die voor ons beschikbaar zijn:

  • Informatica Power Center - een extreem verspreid systeem, extreem productief, met zijn eigen hardware, zijn eigen versiebeheer. Ik gebruikte God verhoede 1% van zijn mogelijkheden. Waarom? Ten eerste heeft deze interface, ergens uit de jaren 380, ons mentaal onder druk gezet. Ten tweede is dit apparaat ontworpen voor buitengewoon fraaie processen, furieus hergebruik van componenten en andere zeer belangrijke bedrijfstrucs. Over wat het kost, zoals de vleugel van de Airbus AXNUMX / jaar, zullen we niets zeggen.

    Pas op, een screenshot kan mensen onder de 30 een beetje pijn doen

    Apache Airflow: ETL gemakkelijker maken

  • SQL Server-integratieserver - we gebruikten deze kameraad in onze intra-projectstromen. Nou, in feite: we gebruiken al SQL Server en het zou op de een of andere manier onredelijk zijn om de ETL-tools ervan niet te gebruiken. Alles erin is goed: zowel de interface is mooi als de voortgangsrapporten ... Maar dit is niet waarom we van softwareproducten houden, oh, niet hiervoor. Versie het dtsx (dat is XML met knooppunten geschud bij opslaan) dat kunnen we, maar wat heeft het voor zin? Hoe zit het met het maken van een taakpakket dat honderden tabellen van de ene server naar de andere zal slepen? Ja, wat honderd, je wijsvinger valt af van twintig stukken, klikken op de muisknop. Maar het ziet er zeker modieuzer uit:

    Apache Airflow: ETL gemakkelijker maken

We hebben zeker naar uitwegen gezocht. Geval zelfs bijna kwam tot een zelfgeschreven SSIS-pakketgenerator ...

…en toen vond ik een nieuwe baan. En Apache Airflow haalde me daarmee in.

Toen ik ontdekte dat ETL-procesbeschrijvingen eenvoudige Python-code zijn, danste ik gewoon niet van vreugde. Zo werden datastromen geversioneerd en gedifferentieerd, en het gieten van tabellen met een enkele structuur van honderden databases in één doel werd een kwestie van Python-code in anderhalf of twee schermen van 13 inch.

Samenstellen van het cluster

Laten we geen complete kleuterschool inrichten, en het hier niet hebben over volkomen voor de hand liggende dingen, zoals het installeren van Airflow, de door u gekozen database, Celery en andere gevallen die in de dokken worden beschreven.

Zodat we meteen met experimenten kunnen beginnen, schetste ik docker-compose.yml waarin:

  • Laten we eigenlijk verhogen Luchtstroom: Planner, Webserver. Flower zal daar ook draaien om Celery-taken te volgen (omdat het al in apache/airflow:1.10.10-python3.7, maar dat vinden we niet erg)
  • PostgreSQL, waarin Airflow zijn service-informatie zal schrijven (schedulergegevens, uitvoeringsstatistieken, enz.) en Celery voltooide taken zal markeren;
  • Redis, die zal optreden als taakmakelaar voor Celery;
  • Selderij werknemer, die zich zal bezighouden met de directe uitvoering van taken.
  • Naar map ./dags we zullen onze bestanden toevoegen met de beschrijving van dags. Ze worden meteen opgepakt, dus het is niet nodig om na elke niesbui met de hele stapel te jongleren.

Op sommige plaatsen wordt de code in de voorbeelden niet volledig weergegeven (om de tekst niet onoverzichtelijk te maken), maar ergens wordt deze tijdens het proces aangepast. Volledige werkende codevoorbeelden zijn te vinden in de repository https://github.com/dm-logv/airflow-tutorial.

havenarbeider-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Opmerkingen:

  • Bij de montage van de compositie ben ik grotendeels uitgegaan van het bekende beeld puckel/docker-luchtstroom - zorg ervoor dat je het bekijkt. Misschien heb je niets anders nodig in je leven.
  • Alle Airflow-instellingen zijn niet alleen beschikbaar via airflow.cfg, maar ook via omgevingsvariabelen (met dank aan de ontwikkelaars), waar ik kwaadwillig misbruik van heb gemaakt.
  • Het is natuurlijk niet productieklaar: ik heb bewust geen hartslagen op containers gezet, ik heb me niet beziggehouden met beveiliging. Maar ik deed het minimum dat geschikt was voor onze onderzoekers.
  • Let daar op:
    • De dagmap moet toegankelijk zijn voor zowel de planner als de medewerkers.
    • Hetzelfde geldt voor alle bibliotheken van derden - ze moeten allemaal worden geïnstalleerd op machines met een planner en werkers.

Nou, nu is het simpel:

$ docker-compose up --scale worker=3

Nadat alles is gestegen, kunt u naar de webinterfaces kijken:

Basisbegrippen

Als je niets begreep van al deze "dagen", dan is hier een kort woordenboek:

  • Scheduler - de belangrijkste oom in Airflow, die bepaalt dat robots hard werken, en niet een persoon: bewaakt het schema, werkt dagen bij, lanceert taken.

    Over het algemeen had hij in oudere versies problemen met het geheugen (nee, geen geheugenverlies, maar lekken) en de legacy-parameter bleef zelfs in de configuraties run_duration - het herstartinterval. Maar nu is alles in orde.

  • DAG (ook bekend als "dag") - "gerichte acyclische grafiek", maar een dergelijke definitie zal weinig mensen vertellen, maar in feite is het een container voor taken die met elkaar communiceren (zie hieronder) of een analoog van Package in SSIS en Workflow in Informatica .

    Naast dags kunnen er nog steeds subdags zijn, maar daar komen we hoogstwaarschijnlijk niet aan toe.

  • DAG rennen - geïnitialiseerde dag, die zijn eigen is toegewezen execution_date. Dagrans van dezelfde dag kunnen parallel werken (als je je taken natuurlijk idempotent hebt gemaakt).
  • Operator zijn stukjes code die verantwoordelijk zijn voor het uitvoeren van een specifieke actie. Er zijn drie soorten operatoren:
    • actiezoals onze favoriet PythonOperator, die elke (geldige) Python-code kan uitvoeren;
    • overdracht, die gegevens transporteren van plaats naar plaats, laten we zeggen MsSqlToHiveTransfer;
    • sensor aan de andere kant stelt het je in staat om te reageren of de verdere uitvoering van de dag te vertragen totdat er een gebeurtenis plaatsvindt. HttpSensor kan het opgegeven eindpunt ophalen en wanneer het gewenste antwoord wacht, start u de overdracht GoogleCloudStorageToS3Operator. Een nieuwsgierige geest zal vragen: “waarom? Herhalingen kun je immers gewoon in de operator doen!” En dan, om de pool van taken niet te verstoppen met geschorste operators. De sensor start, controleert en sterft voor de volgende poging.
  • Taak - verklaarde operators, ongeacht het type, en gekoppeld aan de dag worden gepromoveerd tot de rang van taak.
  • taak instantie - toen de algemene planner besloot dat het tijd was om taken in de strijd te sturen naar uitvoerders (ter plekke, als we LocalExecutor of naar een knooppunt op afstand in het geval van CeleryExecutor), wijst het er een context aan toe (d.w.z. een set variabelen - uitvoeringsparameters), breidt opdracht- of querysjablonen uit en bundelt ze.

We genereren taken

Laten we eerst het algemene schema van onze doug schetsen, en dan zullen we steeds meer in de details duiken, omdat we enkele niet-triviale oplossingen toepassen.

Dus in zijn eenvoudigste vorm ziet zo'n dag er als volgt uit:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Laten we begrijpen:

  • Eerst importeren we de benodigde libs en iets anders;
  • sql_server_ds - Is List[namedtuple[str, str]] met de namen van de verbindingen van Airflow Connections en de databases waaruit we ons bord zullen halen;
  • dag - de aankondiging van onze dag, die noodzakelijkerwijs binnen moet zijn globals(), anders zal Airflow het niet vinden. Doug moet ook zeggen:
    • wat is zijn naam orders - deze naam verschijnt dan in de webinterface,
    • dat hij vanaf middernacht op XNUMX juli zal werken,
    • en het zou ongeveer elke 6 uur moeten draaien (voor stoere jongens hier in plaats van timedelta() toelaatbaar cron-lijn 0 0 0/6 ? * * *, voor de minder coole - een uitdrukking als @daily);
  • workflow() zal het hoofdwerk doen, maar niet nu. Voor nu dumpen we gewoon onze context in het logboek.
  • En nu de simpele magie van het maken van taken:
    • we lopen door onze bronnen;
    • initialiseren PythonOperator, die onze pop zal executeren workflow(). Vergeet niet een unieke (binnen de dag) naam van de taak op te geven en de dag zelf te binden. Vlag provide_context zal op zijn beurt extra argumenten in de functie gieten, die we zorgvuldig zullen verzamelen met behulp van **context.

Voor nu is dat alles. Wat we hebben:

  • nieuwe dag in de webinterface,
  • anderhalfhonderd taken die parallel worden uitgevoerd (als de Airflow, Celery-instellingen en servercapaciteit het toelaten).

Nou, bijna begrepen.

Apache Airflow: ETL gemakkelijker maken
Wie installeert de afhankelijkheden?

Om dit hele ding te vereenvoudigen, heb ik het verpest docker-compose.yml verwerken requirements.txt op alle knooppunten.

Nu is het weg:

Apache Airflow: ETL gemakkelijker maken

Grijze vierkanten zijn taakexemplaren die door de planner worden verwerkt.

We wachten even, de taken worden opgepikt door de arbeiders:

Apache Airflow: ETL gemakkelijker maken

De groene hebben hun werk natuurlijk met succes afgerond. Reds zijn niet erg succesvol.

Er is trouwens geen map op onze prod ./dags, er is geen synchronisatie tussen machines - alle dagen liggen erin git op ons Gitlab, en Gitlab CI distribueert updates naar machines bij het samenvoegen master.

Even over Bloem

Laten we, terwijl de arbeiders onze fopspenen afranselen, een ander hulpmiddel onthouden dat ons iets kan laten zien - Bloem.

De allereerste pagina met beknopte informatie over werkknooppunten:

Apache Airflow: ETL gemakkelijker maken

De meest intense pagina met taken die aan het werk gingen:

Apache Airflow: ETL gemakkelijker maken

De saaiste pagina met de status van onze makelaar:

Apache Airflow: ETL gemakkelijker maken

De helderste pagina is met taakstatusgrafieken en hun uitvoeringstijd:

Apache Airflow: ETL gemakkelijker maken

We laden de onderbelaste

Dus alle taken zijn gelukt, je kunt de gewonden wegdragen.

Apache Airflow: ETL gemakkelijker maken

En er waren veel gewonden - om de een of andere reden. In het geval van correct gebruik van Airflow geven juist deze vierkanten aan dat de gegevens zeker niet zijn aangekomen.

U moet het logboek bekijken en de gevallen taakinstanties opnieuw opstarten.

Door op een vierkant te klikken, zien we de acties die voor ons beschikbaar zijn:

Apache Airflow: ETL gemakkelijker maken

Je kunt de gevallenen nemen en wissen. Dat wil zeggen, we vergeten dat daar iets is mislukt en dezelfde instantietaak gaat naar de planner.

Apache Airflow: ETL gemakkelijker maken

Het is duidelijk dat dit met de muis met alle rode vierkanten niet erg humaan is - dit is niet wat we van Airflow verwachten. Natuurlijk hebben we massavernietigingswapens: Browse/Task Instances

Apache Airflow: ETL gemakkelijker maken

Laten we alles in één keer selecteren en resetten naar nul, klik op het juiste item:

Apache Airflow: ETL gemakkelijker maken

Na het schoonmaken zien onze taxi's er zo uit (ze wachten al op de planner om ze in te plannen):

Apache Airflow: ETL gemakkelijker maken

Verbindingen, haken en andere variabelen

Het is tijd om naar de volgende DAG te kijken, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Heeft iedereen wel eens een rapportupdate gedaan? Dit is haar weer: er is een lijst met bronnen waar je de gegevens vandaan kunt halen; er is een lijst waar te plaatsen; vergeet niet te toeteren als alles gebeurde of kapot ging (nou, dit gaat niet over ons, nee).

Laten we het bestand nog eens doornemen en kijken naar de nieuwe obscure dingen:

  • from commons.operators import TelegramBotSendMessage - niets belet ons om onze eigen operators te maken, waar we van hebben geprofiteerd door een kleine verpakking te maken voor het verzenden van berichten naar Unblocked. (We zullen hieronder meer over deze operator praten);
  • default_args={} - dag kan dezelfde argumenten doorgeven aan al zijn operatoren;
  • to='{{ var.value.all_the_kings_men }}' - veld to we zullen niet hardcoded hebben, maar dynamisch gegenereerd met behulp van Jinja en een variabele met een lijst met e-mails, die ik zorgvuldig heb ingevoerd Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — voorwaarde voor het starten van de aandrijving. In ons geval vliegt de brief alleen naar de bazen als alle afhankelijkheden zijn uitgewerkt met succes;
  • tg_bot_conn_id='tg_main' - argumenten conn_id accepteer verbindings-ID's waarin we maken Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - berichten in Telegram vliegen alleen weg als er gevallen taken zijn;
  • task_concurrency=1 - we verbieden de gelijktijdige lancering van meerdere taakexemplaren van één taak. Anders krijgen we de gelijktijdige lancering van meerdere VerticaOperator (kijkend naar een tafel);
  • report_update >> [email, tg] - Alle VerticaOperator convergeren in het verzenden van brieven en berichten, zoals deze:
    Apache Airflow: ETL gemakkelijker maken

    Maar aangezien exploitanten van kennisgevers verschillende startvoorwaarden hebben, werkt er maar één. In de Tree View ziet alles er wat minder visueel uit:
    Apache Airflow: ETL gemakkelijker maken

Ik zal er een paar woorden over zeggen macro's en hun vrienden - variabelen.

Macro's zijn Jinja-placeholders die verschillende nuttige informatie kunnen vervangen door operatorargumenten. Bijvoorbeeld als volgt:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} wordt uitgebreid naar de inhoud van de contextvariabele execution_date in het formaat YYYY-MM-DD: 2020-07-14. Het beste deel is dat contextvariabelen worden genageld aan een specifieke taakinstantie (een vierkant in de boomstructuur) en wanneer ze opnieuw worden gestart, worden de tijdelijke aanduidingen uitgebreid naar dezelfde waarden.

De toegewezen waarden kunnen worden bekeken met behulp van de knop Rendered op elke taakinstantie. Zo werkt de opdracht bij het versturen van een brief:

Apache Airflow: ETL gemakkelijker maken

En dus bij de taak met het verzenden van een bericht:

Apache Airflow: ETL gemakkelijker maken

Een volledige lijst met ingebouwde macro's voor de nieuwste beschikbare versie is hier beschikbaar: verwijzing naar macro's

Bovendien kunnen we met behulp van plug-ins onze eigen macro's declareren, maar dat is een ander verhaal.

Naast de vooraf gedefinieerde dingen kunnen we de waarden van onze variabelen vervangen (ik heb dit al in de bovenstaande code gebruikt). Laten we creëren Admin/Variables een paar dingen:

Apache Airflow: ETL gemakkelijker maken

Alles wat je kunt gebruiken:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

De waarde kan scalair zijn, of het kan ook JSON zijn. In het geval van JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

gebruik gewoon het pad naar de gewenste sleutel: {{ var.json.bot_config.bot.token }}.

Ik zal letterlijk één woord zeggen en één screenshot laten zien over verbinding. Alles is hier elementair: op de pagina Admin/Connections we maken een verbinding, voegen daar onze logins / wachtwoorden en meer specifieke parameters toe. Soortgelijk:

Apache Airflow: ETL gemakkelijker maken

Wachtwoorden kunnen worden gecodeerd (grondiger dan de standaard), of u kunt het verbindingstype weglaten (zoals ik deed voor tg_main) - het feit is dat de lijst met typen vast is aangesloten op Airflow-modellen en niet kan worden uitgebreid zonder in de broncodes te gaan (als ik plotseling iets niet heb gegoogeld, corrigeer me dan alstublieft), maar niets zal ons ervan weerhouden credits te krijgen door gewoon naam.

Je kunt ook meerdere verbindingen maken met dezelfde naam: in dit geval de methode BaseHook.get_connection(), die ons verbindingen op naam geeft, zal geven willekeurig van verschillende naamgenoten (het zou logischer zijn om Round Robin te maken, maar laten we het aan het geweten van de Airflow-ontwikkelaars overlaten).

Variabelen en verbindingen zijn zeker coole tools, maar het is belangrijk om de balans niet te verliezen: welke delen van je flows sla je op in de code zelf, en welke delen geef je aan Airflow voor opslag. Enerzijds kan het handig zijn om via de UI snel de waarde van bijvoorbeeld een brievenbus te wijzigen. Aan de andere kant is dit nog steeds een terugkeer naar de muisklik, waar wij (ik) vanaf wilden.

Het werken met verbindingen is een van de taken haken. Over het algemeen zijn Airflow-haken punten om het te verbinden met services en bibliotheken van derden. bijv. JiraHook zal een client voor ons openen om met Jira te communiceren (je kunt taken heen en weer verplaatsen), en met behulp van SambaHook je kunt een lokaal bestand naar pushen smb-punt.

De aangepaste operator ontleden

En we kwamen bijna kijken hoe het gemaakt is TelegramBotSendMessage

code commons/operators.py met de eigenlijke operator:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hier is, net als al het andere in Airflow, alles heel eenvoudig:

  • Geërfd van BaseOperator, die nogal wat Airflow-specifieke dingen implementeert (kijk op je gemak)
  • Gedeclareerde velden template_fields, waarin Jinja op zoek gaat naar macro's om te verwerken.
  • Regelde de juiste argumenten voor __init__(), stel de standaardwaarden in waar nodig.
  • We zijn ook de initialisatie van de voorouder niet vergeten.
  • Opende de bijbehorende haak TelegramBotHookheeft er een clientobject van ontvangen.
  • Overschreven (opnieuw gedefinieerde) methode BaseOperator.execute(), die Airfow zal trillen als het tijd is om de operator te lanceren - daarin zullen we de hoofdactie implementeren, waarbij we vergeten in te loggen. (We loggen trouwens meteen in stdout и stderr - Luchtstroom zal alles onderscheppen, mooi inpakken, waar nodig ontleden.)

Laten we eens kijken wat we hebben commons/hooks.py. Het eerste deel van het bestand, met de haak zelf:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ik weet niet eens wat ik hier moet uitleggen, ik noteer alleen de belangrijke punten:

  • We erven, denken na over de argumenten - in de meeste gevallen zal het er een zijn: conn_id;
  • Standaardmethodes overschrijven: ik beperk mezelf get_conn(), waarin ik de verbindingsparameters op naam krijg en alleen de sectie krijg extra (dit is een JSON-veld), waarin ik (volgens mijn eigen instructies!) het Telegram-bottoken heb geplaatst: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ik maak een instantie van our TelegramBot, waardoor het een specifiek token krijgt.

Dat is alles. U kunt een klant van een haak halen met behulp van TelegramBotHook().clent of TelegramBotHook().get_conn().

En het tweede deel van het bestand, waarin ik een microwrapper maak voor de Telegram REST API, om niet hetzelfde te slepen python-telegram-bot voor één methode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

De juiste manier is om alles bij elkaar op te tellen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - in de plug-in, plaats deze in een openbare repository en geef deze aan Open Source.

Terwijl we dit allemaal aan het bestuderen waren, slaagden onze rapportupdates erin om succesvol te mislukken en stuurden me een foutmelding in het kanaal. Ik ga eens kijken of het niet klopt...

Apache Airflow: ETL gemakkelijker maken
Er brak iets in onze doge! Is dat niet wat we verwachtten? Precies!

Ga je gieten?

Heb je het gevoel dat ik iets gemist heb? Het lijkt erop dat hij beloofde gegevens over te zetten van SQL Server naar Vertica, en toen nam hij het en ging van het onderwerp af, de schurk!

Deze gruweldaad was opzettelijk, ik moest gewoon wat terminologie voor je ontcijferen. Nu kun je verder gaan.

Ons plan was dit:

  1. Doe dag
  2. Taken genereren
  3. Zie hoe mooi alles is
  4. Wijs sessienummers toe aan vullingen
  5. Gegevens ophalen van SQL Server
  6. Zet gegevens in Vertica
  7. Statistieken verzamelen

Dus om dit allemaal aan de gang te krijgen, heb ik een kleine toevoeging gemaakt aan onze docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Daar verhogen we:

  • Vertica als gastheer dwh met de meeste standaardinstellingen,
  • drie instanties van SQL Server,
  • we vullen de databases in de laatste met wat gegevens (kijk in geen geval naar mssql_init.py!)

We lanceren al het goede met behulp van een iets ingewikkelder commando dan de vorige keer:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Wat onze wonder-randomizer heeft gegenereerd, kunt u het item gebruiken Data Profiling/Ad Hoc Query:

Apache Airflow: ETL gemakkelijker maken
Het belangrijkste is om het niet aan analisten te laten zien

uitwijden over ETL-sessies Dat doe ik niet, alles is daar triviaal: we maken een basis, er staat een teken in, we pakken alles in met een contextmanager, en nu doen we dit:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sessie.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

De tijd is gekomen onze gegevens verzamelen van onze anderhalfhonderd tafels. Laten we dit doen met behulp van zeer pretentieloze regels:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Met behulp van een haak krijgen we van Airflow pymssql-aansluiten
  2. Laten we een beperking in de vorm van een datum in het verzoek vervangen - deze wordt door de sjabloon-engine in de functie gegooid.
  3. Ons verzoek voeden pandaswie krijgt ons DataFrame - het zal in de toekomst nuttig voor ons zijn.

Ik gebruik vervanging {dt} in plaats van een verzoekparameter %s niet omdat ik een gemene Pinokkio ben, maar omdat pandas kan het niet aan pymssql en glijdt de laatste uit params: Listhoewel hij het echt wil tuple.
Merk ook op dat de ontwikkelaar pymssql besloten hem niet meer te steunen, en het is tijd om te verhuizen pyodbc.

Laten we eens kijken waarmee Airflow de argumenten van onze functies heeft gevuld:

Apache Airflow: ETL gemakkelijker maken

Als er geen gegevens zijn, heeft het geen zin om door te gaan. Maar het is ook vreemd om de vulling als geslaagd te beschouwen. Maar dit is geen vergissing. A-ah-ah, wat te doen?! En dit is wat:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException vertelt Airflow dat er geen fouten zijn, maar we slaan de taak over. De interface krijgt geen groen of rood vierkantje, maar roze.

Laten we onze gegevens weggooien meerdere kolommen:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Namelijk

  • De database waaruit we de bestellingen haalden,
  • ID van onze flooding-sessie (het zal anders zijn voor elke taak),
  • Een hash van de bron- en order-ID - zodat we in de uiteindelijke database (waar alles in één tabel wordt gegoten) een unieke order-ID hebben.

Rest nog de voorlaatste stap: giet alles in Vertica. En, vreemd genoeg, een van de meest spectaculaire en efficiënte manieren om dit te doen is via CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. We maken een speciale ontvanger StringIO.
  2. pandas zal vriendelijk onze DataFrame als CSV-lijnen.
  3. Laten we een verbinding openen met onze favoriete Vertica met een haak.
  4. En nu met de hulp copy() stuur onze gegevens direct naar Vertika!

We nemen van de chauffeur op hoeveel regels er zijn gevuld en vertellen de sessiemanager dat alles in orde is:

session.loaded_rows = cursor.rowcount
session.successful = True

Dat is alles.

Bij de verkoop maken we de doelplaat handmatig aan. Hier heb ik mezelf een kleine machine toegestaan:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ik gebruik VerticaOperator() Ik maak een databaseschema en een tabel (als ze natuurlijk nog niet bestaan). Het belangrijkste is om de afhankelijkheden correct te rangschikken:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Opsomming

- Nou, - zei de kleine muis, - is het niet, nu
Ben je ervan overtuigd dat ik het meest verschrikkelijke dier in het bos ben?

Julia Donaldson, De Gruffalo

Ik denk dat als mijn collega's en ik een wedstrijd hadden: wie zal er snel een ETL-proces vanaf nul maken en starten: zij met hun SSIS en een muis en ik met Airflow ... En dan zouden we ook het onderhoudsgemak vergelijken ... Wauw, ik denk dat je het ermee eens zult zijn dat ik ze op alle fronten zal verslaan!

Als het iets serieuzer was, dan deed Apache Airflow - door processen te beschrijven in de vorm van programmacode - mijn werk veel comfortabeler en leuker.

De onbeperkte uitbreidbaarheid, zowel wat betreft plug-ins als aanleg voor schaalbaarheid, geeft u de mogelijkheid om Airflow op vrijwel elk gebied te gebruiken: zelfs in de volledige cyclus van verzamelen, voorbereiden en verwerken van gegevens, zelfs bij het lanceren van raketten (naar Mars, van cursus).

Deel definitief, referentie en informatie

De hark die we voor je hebben verzameld

  • start_date. Ja, dit is al een lokale meme. Via het belangrijkste argument van Doug start_date allemaal voorbij. Kortom, als u aangeeft in start_date huidige datum, en schedule_interval - op een dag, dan begint DAG morgen niet eerder.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    En geen problemen meer.

    Er is nog een runtime-fout aan verbonden: Task is missing the start_date parameter, wat meestal aangeeft dat u bent vergeten te binden aan de dagoperator.

  • Allemaal op één machine. Ja, en bases (Airflow zelf en onze coating), en een webserver, en een planner, en arbeiders. En het werkte zelfs. Maar na verloop van tijd groeide het aantal taken voor services, en toen PostgreSQL begon te reageren op de index in 20 s in plaats van 5 ms, hebben we het overgenomen en meegenomen.
  • LokaleUitvoerder. Ja, we zitten er nog steeds op en we zijn al aan de rand van de afgrond gekomen. LocalExecutor is tot nu toe genoeg voor ons geweest, maar nu is het tijd om uit te breiden met ten minste één werknemer, en we zullen hard moeten werken om over te stappen naar CeleryExecutor. En gezien het feit dat je er op één machine mee kunt werken, houdt niets je tegen om Celery ook op een server te gebruiken, die “natuurlijk nooit in productie zal gaan, eerlijk gezegd!”
  • Niet-gebruik ingebouwde hulpmiddelen:
    • aansluitingen om servicereferenties op te slaan,
    • SLA mist om te reageren op taken die niet op tijd zijn gelukt,
    • xcom voor metadata-uitwisseling (ik zei metadata!) tussen dagtaken.
  • Mail misbruik. Nou wat kan ik zeggen? Er werden waarschuwingen ingesteld voor alle herhalingen van gevallen taken. Nu heeft mijn werk Gmail >90 e-mails van Airflow, en de webmail-snuit weigert meer dan 100 tegelijk op te halen en te verwijderen.

Meer valkuilen: Apache Airflow-valkuilen

Meer automatiseringstools

Om nog meer met ons hoofd te kunnen werken in plaats van met onze handen, heeft Airflow dit voor ons voorbereid:

  • REST API - hij heeft nog steeds de status van Experimenteel, wat hem niet belet om te werken. Hiermee kun je niet alleen informatie krijgen over dagen en taken, maar ook een dag stoppen/starten, een DAG Run of een pool maken.
  • CLI - er zijn veel tools beschikbaar via de opdrachtregel die niet alleen onhandig zijn om te gebruiken via de WebUI, maar die over het algemeen afwezig zijn. Bijvoorbeeld:
    • backfill nodig om taakexemplaren opnieuw te starten.
      Zo kwamen er analisten en zeiden: “En jij, kameraad, heb onzin in de data van 1 tot 13 januari! Repareer het, repareer het, repareer het, repareer het!" En je bent zo'n kookplaat:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basisservice: initdb, resetdb, upgradedb, checkdb.
    • run, waarmee u één instantietaak kunt uitvoeren en zelfs kunt scoren op alle afhankelijkheden. Bovendien kun je het via LocalExecutor, zelfs als u een Celery-cluster hebt.
    • Doet ongeveer hetzelfde test, alleen ook in basen schrijft niets.
    • connections maakt massale creatie van verbindingen vanuit de shell mogelijk.
  • Python API - een nogal hardcore manier van interactie, die bedoeld is voor plug-ins, en er niet met kleine handjes in zwermt. Maar wie houdt ons tegen om naar toe te gaan /home/airflow/dags, loop ipython en beginnen te rommelen? U kunt bijvoorbeeld alle verbindingen exporteren met de volgende code:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Verbinding maken met de Airflow-metadatabase. Ik raad niet aan om ernaar te schrijven, maar het verkrijgen van taakstatussen voor verschillende specifieke statistieken kan veel sneller en gemakkelijker zijn dan het gebruik van een van de API's.

    Laten we zeggen dat niet al onze taken idempotent zijn, maar ze kunnen soms vallen, en dit is normaal. Maar een paar verstoppingen zijn al verdacht en zouden moeten worden gecontroleerd.

    Pas op voor SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referenties

En natuurlijk zijn de eerste tien links van de uitgifte van Google de inhoud van de Airflow-map uit mijn bladwijzers.

En de links die in het artikel worden gebruikt:

Bron: www.habr.com