Apache Airflow: Hêsantirkirina ETL

Silav, ez Dmitry Logvinenko me - Endezyarê Daneyên Beşa Analytics ya koma pargîdaniyên Vezet.

Ez ê ji we re li ser amûrek ecêb ji bo pêşkeftina pêvajoyên ETL - Apache Airflow vebêjim. Lê Airflow ew qas pirreng û piralî ye ku divê hûn ji nêz ve lê binihêrin heke hûn di herikîna daneyan de nebin jî, lê hewcedariya we heye ku hûn pêvajoyek periyodîk bidin destpêkirin û pêkanîna wan bişopînin.

Erê, ez ê ne tenê bibêjim, lê di heman demê de destnîşan bikim: bername gelek kod, dîmen û pêşniyar hene.

Apache Airflow: Hêsantirkirina ETL
Dema ku hûn peyva Airflow / Wikimedia Commons di google de bi gelemperî hûn dibînin

Table of Contents

Pîrozbahiyê

Apache Airflow mîna Django ye:

  • bi python hatiye nivîsandin
  • panelek mezin a rêveberiyê heye,
  • bêdawî berfireh dibe

- tenê çêtir e, û ew ji bo mebestên bi tevahî cûda hate çêkirin, ango (wek ku berî katê hatî nivîsandin):

  • peywirên xebitandin û çavdêrîkirina li ser hejmarek bêsînor makîneyan (wek ku gelek Celery / Kubernetes û wijdanê we dê destûrê bidin we)
  • bi hilberîna xebata dînamîkî ya ji nivîsandin û fêmkirina koda Python pir hêsan e
  • û şiyana girêdana her databas û API-yê bi hev re bi karanîna hem hêmanên amade û hem jî pêvekên malê-çêkirî (ku zehf hêsan e).

Em Apache Airflow bi vî rengî bikar tînin:

  • em daneyan ji çavkaniyên cihêreng (gelek mînakên SQL Server û PostgreSQL, API-yên cihêreng ên bi pîvanên serîlêdanê, tewra 1C) di DWH û ODS de berhev dikin (me Vertica û Clickhouse hene).
  • çiqas pêşketî cron, ku pêvajoyên hevgirtina daneyan li ser ODS dest pê dike, û di heman demê de lênihêrîna wan jî dişopîne.

Heya vê dawiyê, hewcedariyên me ji hêla serverek piçûk ve bi 32 core û 50 GB RAM ve dihatin vegirtin. Di Airflow de, ev kar dike:

  • более 200 dar (bi rastî pêlên xebatê, ku me tê de peywir pêk anî),
  • di her navencî de 70 wezîfe,
  • ev qencî dest pê dike (di heman demê de bi navînî) saetê carekê.

Û li ser ka me çawa berfireh kir, ez ê li jêr binivîsim, lê naha em pirsgirêka über-a ku em ê çareser bikin diyar bikin:

Sê Pêşkêşkerên SQL-ya çavkaniyê hene, her yek bi 50 databasên - mînakên yek projeyê, bi rêzê ve, wan xwedî heman avahî ne (hema hema li her deverê, mua-ha-ha), ku tê vê wateyê ku her yekê tabloyek Order heye (bextane, tabloyek bi wê nav dikare di her karsaziyê de were avêtin). Em daneyan bi lêzêdekirina qadên karûbarê (pêşkêşkara çavkaniyê, databasa çavkaniyê, Nasnameya peywirê ya ETL) digirin û wan bi dilşikestî diavêjin, bêje, Vertica.

Em herin!

Beşa sereke, pratîk (û piçek teorîk)

Çima em (û hûn)

Dema ku darên mezin bûn û ez sade bûm SQL-di yek firotgehek rûsî de, me bi du amûrên ku ji me re hene bi pêvajoyên ETL yên wekî herikîna daneyê xapandin:

  • Navenda Hêza Informatica - pergalek pir belavbûyî, zehf hilber, bi hardware xwe, guhertoya xwe. Min Xwedê nehêle 1% ji kapasîteyên wê bikar anîn. Çima? Belê, berî her tiştî, ev navber, ji salên 380-an vir ve, bi zihniyet zext li me kir. Ya duyemîn, ev tevlihevî ji bo pêvajoyên pir xweşik, ji nû ve karanîna hêmanên hêrs û hîleyên din ên pargîdanî yên pir girîng hatî çêkirin. Li ser lêçûna wê, mîna baskê Airbus AXNUMX / sal, em ê tiştek nebêjin.

    Hişyar bin, dîmenek dikare hinekî zirarê bide mirovên di bin 30 salî de

    Apache Airflow: Hêsantirkirina ETL

  • Pêşkêşkara Yekbûna Servera SQL - Me ev rêheval di nava projeyên xwe de bi kar anî. Welê, bi rastî: em jixwe SQL Server bikar tînin, û ew ê hinekî ne maqûl be ku em amûrên wê ETL bikar neynin. Her tişt di wê de baş e: hem navbeynkar xweşik e, hem jî raporên pêşkeftinê ... Lê ne ji ber vê yekê em ji hilberên nermalavê hez dikin, oh, ne ji bo vê yekê. Guhertoya wê dtsx (ku XML bi girêkên ku li ser hilanînê veqetandî ye) em dikarin, lê mesele çi ye? Ma hûn pakêtek peywirê çêbikin ku dê bi sedan tablo ji serverek berbi ya din bikişîne? Erê, çi sed e, tiliya te ya nîşanê dê ji bîst perçeyan bikeve, bişkoka mişkê bike. Lê bê guman ew modatir xuya dike:

    Apache Airflow: Hêsantirkirina ETL

Bê guman em li rêyên derketinê geriyan. Case even hema hema hat ber jeneratorek pakêtê ya SSIS-a xwe-nivîskî ...

…û paşê karekî nû min dît. Û Apache Airflow min li ser wê girt.

Gava ku min fêhm kir ku danasînên pêvajoya ETL koda Python-ê hêsan in, min tenê ji şahiyê dans nekir. Bi vî rengî herikên daneyê hatin guhertokirin û cûda kirin, û rijandina tabloyên bi avahiyek yekane ji bi sedan databasan di nav yek armancê de bû mijara koda Python di yek û nîv an du ekranên 13 ”an de.

Komkirina komê

Werin em bi tevahî zarokxaneyek saz nekin, û li vir li ser tiştên bi tevahî eşkere neaxivin, mîna sazkirina Airflow, databasa weya bijartî, Celery û rewşên din ên ku di doşkeyan de têne diyar kirin.

Ji bo ku em tavilê dest bi ceribandinan bikin, min xêz kir docker-compose.yml di kîjanê de:

  • Bi rastî em rabin şibaka: Bername, Webserver. Kulîlk dê li wir jî dizivire da ku karên Celery bişopîne (ji ber ku ew ji berê ve hatî avêtin apache/airflow:1.10.10-python3.7, lê em xem nakin)
  • PostgreSQL, ku tê de Airflow dê agahdariya karûbarê xwe binivîse (daneyên plansazker, statîstîkên darvekirinê, hwd.), û Celery dê karên qedandî nîşan bide;
  • Redis, ku dê ji bo Celery wekî brokerek peywirê tevbigere;
  • Karkerê sêlê, ku dê bi cîbicîkirina rasterast a karan ve mijûl bibe.
  • Ji bo peldankê ./dags em ê pelên xwe bi danasîna dags lê zêde bikin. Ew ê di firînê de werin hilanîn, ji ber vê yekê ne hewce ye ku piştî her birûskê bi tevahî stêrk biqelînin.

Li hin deveran, koda di mînakan de bi tevahî nayê xuyang kirin (da ku nivîsê tevlihev neke), lê li cîhek ew di pêvajoyê de tê guheztin. Nimûneyên kodên xebatê yên bêkêmasî dikarin di depoyê de werin dîtin https://github.com/dm-logv/airflow-tutorial.

docker-berhevkirin.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Di civîna kompozîsyonê de, min bi piranî xwe spart wêneyê naskirî puckel / docker-airflow - Bê guman wê kontrol bikin. Dibe ku hûn di jiyana xwe de ne hewce ne tiştek din.
  • Hemî mîhengên hewayê ne tenê bi navgîniyê têne peyda kirin airflow.cfg, di heman demê de bi navgîniya guhêrbarên hawîrdorê (spas ji pêşdebiran re), ku min bi xerabî jê sûd girt.
  • Bi xwezayî, ew ne amade ye ji bo hilberînê: Min bi qestî lêdanên dil nexist ser konteyneran, min ji ewlehiyê aciz nekir. Lê min ji bo ceribandinên me kêmtirîn guncan kir.
  • Têbînî ku:
    • Peldanka dag divê hem ji plansazker û hem jî ji xebatkaran re bigihîje.
    • Heman tişt ji bo hemî pirtûkxaneyên partiya sêyemîn jî derbas dibe - divê ew hemî li ser makîneyên bi plansazker û xebatkaran werin saz kirin.

Belê, niha ew hêsan e:

$ docker-compose up --scale worker=3

Piştî ku her tişt rabe, hûn dikarin li navgînên malperê binêrin:

Têgehên bingehîn

Ger we di van hemî "dag"an de tiştek fam nekiribe, wê hingê li vir ferhengek kurt heye:

  • Scheduler - Mamê herî girîng ê di Airflow de, kontrol dike ku robot bi dijwarî dixebitin, û ne kesek: nexşeyê dişopîne, dakêşan nûve dike, karan dest pê dike.

    Bi gelemperî, di guhertoyên kevn de, wî bi bîranînê re pirsgirêkên wî hebûn (na, ne amnesia, lê diherikin) û pîvana mîrasê jî di mîhengan de ma. run_duration - navbera wê ya ji nû ve destpêkirinê. Lê niha her tişt baş e.

  • DAG (aka "dag") - "grafiya asîklîk a rêvekirî", lê pênaseyek wusa dê ji çend kesan re vebêje, lê bi rastî ew konteynir e ji bo peywirên ku bi hevûdu re têkildar in (li jêr binêre) an analogek Pakêtê di SSIS û Workflow di Informatica de .

    Ji xeynî dagan, dibe ku hîn jî binerd hebin, lê bi îhtîmalek mezin em ê negihîjin wan.

  • DAG Run - dag destpêkî, ku bi xwe ve hatî destnîşan kirin execution_date. Dagranên heman dagê dikarin paralel bixebitin (heke we karên xwe bêhêz kirin, bê guman).
  • Makînevan parçeyên kodê ne ku ji bo pêkanîna çalakiyek taybetî berpirsiyar in. Sê celeb operator hene:
    • çalakîwek favorite me PythonOperator, ku dikare her kodek Python (derbasdar) bixebite;
    • derbaskirin, ku daneyan ji cihekî vediguhezîne cîh, bêje, MsSqlToHiveTransfer;
    • sensor ji hêla din ve, ew ê bihêle ku hûn bertek nîşan bidin an jî înfazkirina din a dagê hêdî bikin heya ku bûyerek çêbibe. HttpSensor dikare xala dawiya diyarkirî bikişîne, û gava ku bersiva xwestî li bendê ye, veguheztinê dest pê bike GoogleCloudStorageToS3Operator. Hişê lêkolîner dê bipirse: "Çima? Beriya her tiştî, hûn dikarin di operatorê de rast dubare bikin!” Û dûv re, ji bo ku hewza peywiran bi operatorên rawestandî re neyê girtin. Sensor dest pê dike, kontrol dike û berî hewildana din dimire.
  • Karî - Operatorên diyarkirî, bêyî ku celeb be, û bi dag ve girêdayî ne di rêza peywirê de têne pêşve xistin.
  • mînaka peywirê - gava ku plansazê giştî biryar da ku ew dem e ku peywiran bişîne şer li ser karker-karkeran (rast di cih de, ger em bikar bînin LocalExecutor an ji bo node dûr di doza CeleryExecutor), ew ji wan re çarçoveyek destnîşan dike (ango, komek guhêrbar - parametreyên darvekirinê), şablonên ferman an pirsê berfireh dike, û wan berhev dike.

Em peywiran çêdikin

Pêşî, bila em nexşeya giştî ya doçka xwe xêz bikin, û dûv re jî em ê bêtir û bêtir hûrguliyan bişopînin, ji ber ku em hin çareseriyên ne-pîvan bicîh tînin.

Ji ber vê yekê, di forma xweya herî hêsan de, dagek wusa dê bi vî rengî xuya bike:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ka em wê bihesibînin:

  • Pêşî, em libên pêwîst û tiştekî din;
  • sql_server_ds Ye List[namedtuple[str, str]] bi navên girêdanên ji Girêdanên Airflow û databasên ku em ê plakaya xwe jê bigirin;
  • dag - Ragihandina dagê me, ku pêdivî ye ku tê de be globals(), Wekî din Airflow wê nabîne. Doug jî hewce dike ku bêje:
    • navê wî çi ye orders - Dê ev nav paşê di navgîniya malperê de xuya bibe,
    • ku ew ê ji nîvê şeva heştê Tîrmehê bixebite,
    • û divê ew bimeşe, hema hema her 6 demjimêran (ji bo xortên dijwar li vir li şûna timedelta() dibe cron-xet 0 0 0/6 ? * * *, ji bo kêm sar - îfadeyek mîna @daily);
  • workflow() dê karê sereke bike, lê ne niha. Heya nuha, em ê tenê naveroka xwe bavêjin têketinê.
  • Û naha sêrbaziya hêsan a afirandina karan:
    • em bi çavkaniyên xwe diherikin;
    • destpêk kirin PythonOperator, ku dê dumê me îdam bike workflow(). Ji bîr nekin ku navek yekta (di nav dagê de) ya peywirê diyar bikin û dag bixwe girêdin. Al provide_context di encamê de, dê argumanên din jî birijîne nav fonksiyonê, ku em ê bi baldarî bikar bînin berhev bikin **context.

Ji bo niha, ew hemû. Tiştê ku me girt:

  • xala nû di navgîniya malperê de,
  • sed û nîv peywirên ku dê bi paralelî bêne darve kirin (heke mîhengên Airflow, Celery û kapasîteya serverê destûrê bidin).

Baş e, hema ew girt.

Apache Airflow: Hêsantirkirina ETL
Kî dê girêdanan saz bike?

Ji bo hêsankirina vê tevahiyê, min şikand docker-compose.yml xebitandinî requirements.txt li ser hemû girêkan.

Niha ew çû:

Apache Airflow: Hêsantirkirina ETL

Qadên gewr mînakên peywirê ne ku ji hêla plansazker ve têne hilberandin.

Em hinekî li bendê dimînin, peywir ji hêla karkeran ve têne hilanîn:

Apache Airflow: Hêsantirkirina ETL

Yên kesk, helbet karê xwe bi serkeftî qedandine. Sor ne pir serkeftî ne.

Bi awayê, peldankek li ser hilberê me tune ./dags, di navbera makîneyan de hevdengiyek tune - hemî dag di nav de ne git li ser Gitlab-a me, û Gitlab CI dema ku tev li hev dibin nûvekirinan li makîneyan belav dike master.

Hinekî li ser Flower

Dema ku xebatkar pîsîkên me diqelînin, em amûrek din a ku dikare tiştek nîşanî me bide bi bîr bînin - Kulîlk.

Rûpelê yekem bi kurteya agahdariya li ser girêkên karker:

Apache Airflow: Hêsantirkirina ETL

Rûpelê herî zexm bi karên ku ketine xebatê:

Apache Airflow: Hêsantirkirina ETL

Rûpelê herî bêzar bi statûya brokerê me:

Apache Airflow: Hêsantirkirina ETL

Rûpelê herî geş bi grafikên rewşa peywirê û dema pêkanîna wan e:

Apache Airflow: Hêsantirkirina ETL

Em barkirina bin bar dikin

Ji ber vê yekê, hemî peywir bi ser ketin, hûn dikarin birîndaran rakin.

Apache Airflow: Hêsantirkirina ETL

Û gelek birîndar bûn - ji ber sedemek an sedemek din. Di doza karanîna rast a Airflow de, van çargoşeyan destnîşan dikin ku dane bê guman nehatine.

Pêdivî ye ku hûn têketinê temaşe bikin û bûyerên peywirê yên ketî ji nû ve bidin destpêkirin.

Bi tikandina li ser çargoşeyekê, em ê çalakiyên ku ji me re peyda dibin bibînin:

Apache Airflow: Hêsantirkirina ETL

Hûn dikarin bikevin û paqij bikin. Ango, em ji bîr dikin ku tiştek li wir têk çûye, û heman peywira nimûneyê dê biçe ser plansazker.

Apache Airflow: Hêsantirkirina ETL

Eşkere ye ku kirina vê yekê bi mişkê bi hemî çarçikên sor re ne pir mirovahî ye - ev ne ya ku em ji Airflow hêvî dikin e. Bi xwezayî, çekên me yên qirkirina komî hene: Browse/Task Instances

Apache Airflow: Hêsantirkirina ETL

Ka em her tiştî bi yekcarî hilbijêrin û sifirê vegerînin, tiştê rast bikirtînin:

Apache Airflow: Hêsantirkirina ETL

Piştî paqijkirinê, taksiyên me bi vî rengî xuya dikin (ew jixwe li benda plansazker in ku wan bi rê ve bibe):

Apache Airflow: Hêsantirkirina ETL

Girêdan, çeng û guhêrbarên din

Wext e ku meriv li DAG-a din binêre, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ma her kesî carî nûvekirina raporê kiriye? Ev dîsa ew e: navnîşek jêderan heye ku meriv ji ku derê daneyê bigire; lîsteyek heye ku meriv lê bixin; ji bîr nekin ku gava her tişt qewimî an şikest, xêz bikin (baş, ev ne li ser me ye, na).

Ka em dîsa li pelê bigerin û li tiştên nû yên nezelal binêrin:

  • from commons.operators import TelegramBotSendMessage - Tiştek me nahêle ku em operatorên xwe çêbikin, ku me jê sûd werdigire bi çêkirina pêçek piçûk ji bo şandina peyaman ji Unblocked re. (Em ê li jêr li ser vê operatorê bêtir biaxivin);
  • default_args={} - dag dikare heman argumanan li hemî operatorên xwe belav bike;
  • to='{{ var.value.all_the_kings_men }}' - zeviyê to em ê ne kodkirî nebin, lê bi dînamîk bi karanîna Jinja û guhêrbarek bi navnîşek e-nameyê, ku min bi baldarî tê de danîne, têne çêkirin. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - şertê destpêkirina operatorê. Di rewşa me de, nameyê tenê heke hemî girêdan bi ser ketine dê berbi patronan ve biçe bi serkeftî;
  • tg_bot_conn_id='tg_main' - argumentên conn_id Nasnameyên pêwendiyê yên ku em tê de diafirînin qebûl bikin Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - peyamên di Telegram de tenê heke peywirên ketî hebin dê bifirin;
  • task_concurrency=1 - em destpêkirina hevdemî ya çend nimûneyên peywirê yên yek peywirê qedexe dikin. Wekî din, em ê hemwext destpêkirina çendan bistînin VerticaOperator (li yek maseyê dinêre);
  • report_update >> [email, tg] - hemî VerticaOperator di şandina name û peyaman de li hev bicivin, bi vî rengî:
    Apache Airflow: Hêsantirkirina ETL

    Lê ji ber ku operatorên ragihandinê xwedî şert û mercên destpêkirina cihê ne, tenê yek dê bixebite. Di Dîtina Darê de, her tişt hinekî kêmtir dîtbar xuya dike:
    Apache Airflow: Hêsantirkirina ETL

Ez ê li ser çend peyvan bibêjim makro û hevalên wan - variables.

Makro cîhên Jinja ne ku dikarin agahdariyên cihêreng ên kêrhatî di nav argumanên operatorê de biguhezînin. Mînakî, bi vî rengî:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} dê berbi naveroka guhêrbara kontekstê ve bibe execution_date di forma YYYY-MM-DD: 2020-07-14. Beşa çêtirîn ev e ku guhêrbarên çarçoveyê li mînakek peywirek taybetî (çargoşeyek di Nîşana Darê de) têne girêdan, û gava ku ji nû ve dest pê bikin, cîhgir dê li heman nirxan berfireh bibin.

Nirxên destnîşankirî dikarin bi karanîna bişkoka Rendered li ser her mînakek peywirê werin dîtin. Karê şandina nameyê bi vî rengî ye:

Apache Airflow: Hêsantirkirina ETL

Û bi vî awayî di karê şandina peyamê de:

Apache Airflow: Hêsantirkirina ETL

Navnîşek bêkêmasî ya makroyên çêkirî yên ji bo guhertoya herî dawî ya berdest li vir heye: referansa makro

Wekî din, bi alîkariya pêvekan, em dikarin makroyên xwe ragihînin, lê ew çîrokek din e.

Ji bilî tiştên pêşwext, em dikarin nirxên guhêrbarên xwe biguhezînin (min berê di koda jorîn de ev bikar anî). Werin em biafirînin Admin/Variables çend tişt:

Apache Airflow: Hêsantirkirina ETL

Her tiştê ku hûn dikarin bikar bînin:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Nirx dikare pîvanek be, an jî dikare JSON be. Di doza JSON de:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

tenê riya mifteya xwestinê bikar bînin: {{ var.json.bot_config.bot.token }}.

Ez ê bi rastî yek gotinê bibêjim û yek dîmenek li ser nîşan bidim girêdan. Li vir her tişt bingehîn e: li ser rûpelê Admin/Connections em têkiliyek çêdikin, têketin / şîfreyên xwe û pîvanên taybetî li wir zêde dikin. Welî evê:

Apache Airflow: Hêsantirkirina ETL

Şîfre dikarin bêne şîfre kirin (ji ya xwerû bi hûrgulî), an hûn dikarin celebê girêdanê bihêlin (wek ku min ji bo tg_main) - Rastî ev e ku navnîşa celeban di modelên Airflow de hişk e û bêyî ketina kodên çavkaniyê nayê berfireh kirin (heke ji nişka ve min tiştek google nekiriye, ji kerema xwe min rast bike), lê tiştek dê me nehêle ku em tenê bi kredî bistînin. nav.

Di heman demê de hûn dikarin bi heman navî çend pêwendiyan çêbikin: di vê rewşê de, rêbaz BaseHook.get_connection(), ku ji me re girêdanên bi navê, dê bide bêpayîn ji çend navan (dê maqûltir be ku meriv Round Robin çêbike, lê bila em wê li ser wijdana pêşdebirên Airflow bihêlin).

Guherîn û Têkilî bê guman amûrên xweş in, lê girîng e ku hûn hevsengiyê winda nekin: hûn kîjan parçeyên herikên xwe di kodê de hildigirin, û hûn kîjan parçeyan ji bo hilanînê didin Airflow. Ji aliyek ve, ew dikare hêsan be ku meriv zû nirxê biguhezîne, mînakî, qutiyek e-nameyê, bi navgîniya UI-yê. Ji hêla din ve, ev hîn jî vegerek e li klîk mişkê, ya ku me (min) dixwest jê xilas bibe.

Karkirina bi girêdanan yek ji wan karan e hooks. Bi gelemperî, çîpên Airflow xalên girêdana wê bi karûbar û pirtûkxaneyên partiya sêyemîn re ne. Mînak, JiraHook dê ji me re xerîdarek veke ku em bi Jira re têkilî daynin (hûn dikarin peywiran bi paş û paş ve bikin), û bi alîkariya SambaHook hûn dikarin pelek herêmî bişopînin smb-cî.

Parsing operatorê xwerû

Û em nêzîkî dîtina wê yekê bûn ku ew çawa hatî çêkirin TelegramBotSendMessage

code commons/operators.py bi operatorê rastîn:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Li vir, mîna her tiştê din di Airflow de, her tişt pir hêsan e:

  • mîrata ji BaseOperator, ku çend tiştên taybetî yên hewayê bicîh tîne (li dema vala xwe binihêre)
  • Zeviyên diyar kirin template_fields, ku tê de Jinja dê li makroyan bigere ku pêvajoyê bike.
  • Ji bo argumanên rast rêz kirin __init__(), li cîhê ku pêwîst be pêşandan saz bikin.
  • Me destpêkirina bav û kalan jî ji bîr nekir.
  • Kulika têkildar vekir TelegramBotHookjê tiştekî muşterek standiye.
  • Rêbaza sergirtî (ji nû ve pênasekirin). BaseOperator.execute(), ku Airfow ê gava ku dema destpêkirina operatorê tê biqelişe - di wê de em ê çalakiya sereke bicîh bînin, ji bîr nekin ku têkevinê. (Em bi awayê, rast têkevin stdout и stderr - Herikîna hewayê dê her tiştî bigire, wê bi xweşikî pêça, li cîhê ku hewce bike wê hilweşîne.)

Ka em bibînin ka çi heye commons/hooks.py. Beşa yekem a pelê, bi çengelê bixwe:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ez jî nizanim li vir çi şirove bikim, ez ê tenê xalên girîng destnîşan bikim:

  • Em mîras digirin, li ser argumanan difikirin - di pir rewşan de ew ê yek be: conn_id;
  • Rêbazên standard ên berbiçav: Min xwe bi sînor kir get_conn(), ku tê de ez pîvanên pêwendiyê bi navê xwe distînim û tenê beşê distînim extra (ev zeviyek JSON e), ku ez tê de (li gorî rêwerzên xwe!) tokena botê ya Telegram danîne: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ez mînakek me çêdikim TelegramBot, nîşanek taybetî dide.

Navê pêger. Hûn dikarin xerîdarek ji hook bikar bînin TelegramBotHook().clent an TelegramBotHook().get_conn().

Û beşa duyemîn a pelê, ku tê de ez ji bo Telegram REST API-ya mîkrokûpê çêdikim, da ku heman yekê nekişîne python-telegram-bot ji bo yek rêbazê sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Awayê rast ev e ku hûn hemî lê zêde bikin: TelegramBotSendMessage, TelegramBotHook, TelegramBot - di pêvekê de, têxin depoyek giştî, û wê bidin Çavkaniya Vekirî.

Dema ku me van hemîyan dixwend, nûvekirinên raporta me bi serfirazî têk çûn û di kanalê de peyamek xeletiyek ji min re şandin. Ez ê kontrol bikim ka ew xelet e…

Apache Airflow: Hêsantirkirina ETL
Tiştek di dojka me de şikest! Ma ev ne ya ku em li bendê bûn? Tam!

Ma hûn ê birijînin?

Ma hûn hîs dikin ku min tiştek winda kiriye? Wusa dixuye ku wî soz da ku daneyan ji SQL Serverê veguhezîne Vertica, û dûv re wî ew hilda û ji mijarê dûr ket, pîs!

Ev hovîtî bi qestî bû, min bi tenê neçar ma ku ji we re hin termînolojiyê deşîfre bikim. Niha hûn dikarin bêtir biçin.

Plana me ev bû:

  1. Bikin
  2. Karûbaran çêbikin
  3. Binêrin her tişt çiqas xweş e
  4. Ji bo dagirtin hejmarên danişînê destnîşan bikin
  5. Daneyên ji SQL Server bistînin
  6. Daneyên xwe bixin nav Vertica
  7. Statîstîkan berhev bikin

Ji ber vê yekê, ji bo ku ev hemî rabe û bixebite, min pêvekek piçûk ji me re çêkir docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Li wir em bilind dikin:

  • Vertica wekî mêvandar dwh bi mîhengên herî xwerû,
  • sê mînakên SQL Server,
  • em databasên di paşîn de bi hin daneyan tijî dikin (di tu rewşê de nenêrin mssql_init.py!)

Em bi alîkariya fermanek hinekî tevlihevtir ji ya berê dest pê dikin:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Tiştê ku randomîzatora meya mûcîze çêkiriye, hûn dikarin tiştê bikar bînin Data Profiling/Ad Hoc Query:

Apache Airflow: Hêsantirkirina ETL
Ya sereke ne ew e ku meriv wê ji analîstan re nîşan bide

berfireh kirin danişînên ETL Ez naxwazim, her tişt li wê derê piçûk e: em bingehek çêdikin, tê de nîşanek heye, em her tiştî bi rêveberek çarçovê re dipêçin, û naha em vê yekê dikin:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Dem hatiye daneyên me berhev bikin ji sed û nîv maseyên me. Ka em vê yekê bi alîkariya rêzikên pir bêkêmasî bikin:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Bi alîkariya çengek em ji Airflow distînin pymssql-bihevgirêdan
  2. Werin em sînorkirinek di forma tarîxê de li daxwazê ​​biguhezînin - ew ê ji hêla motora şablonê ve were avêtin nav fonksiyonê.
  3. Daxwaza me têr kirin pandaskî dê me bigire DataFrame - Di pêşerojê de dê ji me re bikêr be.

Ez cîgir bikar tînim {dt} li şûna parametreyek daxwazê %s ne ji ber ku ez Pînokyoyekî xerab im, lê ji ber ku pandas nikare bi dest bixe pymssql û ya dawî diqulipîne params: Listtevî ku ew bi rastî dixwaze tuple.
Her weha bala xwe bidin ku pêşdebir pymssql biryar da ku êdî piştgirî nede wî, û dem dema derketina derve ye pyodbc.

Ka em bibînin ka Airflow argumanên fonksiyonên me bi çi dagirtî:

Apache Airflow: Hêsantirkirina ETL

Ger dane tune be, wê demê tu wateya berdewamkirinê tune. Lê di heman demê de ecêb e ku meriv dagirtina serketî were hesibandin. Lê ev ne xeletiyek e. A-ah-ah, çi bikim?! Û li vir çi ye:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException dê ji Airflow re bêje ku xeletî tune, lê em ji peywirê derdixin. Di navberê de dê çargoşeyek kesk an sor nebe, lê pembe.

Ka em daneyên xwe biavêjin stûnên piralî:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Namely:

  • Databasa ku me ferman jê girt,
  • Nasnameya rûniştina meya lehiyê (ew ê cûda be ji bo her karekî),
  • Hashek ji çavkaniyê û Nasnameya fermanê - da ku di databasa paşîn de (ku her tişt li yek tabloyê tê rijandin) me nasnameyek fermanê ya bêhempa heye.

Pêngava paşîn dimîne: her tiştî biavêjin Vertica. Û, pir ecêb e, yek ji awayên herî balkêş û bikêrhatî ji bo kirina vê yekê bi riya CSV e!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Em wergirê taybet çêdikin StringIO.
  2. pandas dê bi dilovanî me deynin DataFrame di forma CSV- xetên.
  3. Werin em bi çengelê pêwendiyek bi Vertica-ya xweya bijare vekin.
  4. Û niha bi alîkariyê copy() daneyên me rasterast ji Vertika re bişînin!

Em ê ji ajokerê bigirin ka çend xet tije bûne, û ji rêveberê rûniştinê re bêjin ku her tişt baş e:

session.loaded_rows = cursor.rowcount
session.successful = True

Navê pêger.

Di firotanê de, em plakaya armancê bi destan diafirînin. Li vir min destûr da xwe makîneyek piçûk:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ez bi kar tînim VerticaOperator() Ez nexşeyek databas û tabloyek diafirînim (heke ew jixwe nebin, bê guman). Ya sereke ev e ku meriv pêwendiyan bi rêkûpêk saz bike:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Hilberîn

- Belê, - mişkê biçûk got, - ma ne niha
Ma hûn pê bawer in ku ez heywanê herî xedar ê daristanê me?

Julia Donaldson, The Gruffalo

Ez difikirim ku heke min û hevkarên min pêşbaziyek hebûya: kî dê zû pêvajoyek ETL ji sifrê biafirîne û bide destpêkirin: ew bi SSIS û mişkek xwe û ez bi Airflow re ... Û wê hingê em ê hêsaniya lênihêrînê jî bidin ber hev ... Wow, ez difikirim ku hûn ê bipejirînin ku ez ê wan li hemî eniyan bişkînim!

Ger hinekî ciddîtir, wê hingê Apache Airflow - bi danasîna pêvajoyên di forma koda bernameyê de - karê min kir pir rehettir û xweştir.

Berfirehbûna wê ya bêsînor, hem di warê pêvekan de û hem jî ji hêla pîvanbûnê ve, fersendê dide we ku hûn hema hema li her deverê Airflow bikar bînin: tewra di çerxa tevahî ya berhevkirin, amadekirin û hilberandina daneyan de, tewra di avêtina rokêtan de jî (ji bo Marsê, ji kûrs).

Beşa dawî, referans û agahdarî

Raqeya ku me ji we re berhev kiriye

  • start_date. Erê, ev jixwe memeyek herêmî ye. Bi riya argumana sereke ya Doug start_date hemû derbas dibin. Bi kurtasî, heke hûn tê de diyar bikin start_date roja niha, û schedule_interval - Rojekê, DAG wê sibe ne zûtir dest pê bike.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Û bêtir pirsgirêk tune.

    Çewtiyek din a dema xebitandinê bi wê re têkildar heye: Task is missing the start_date parameter, ku pir caran destnîşan dike ku we ji bîr kir ku hûn bi operatorê dag ve girêdin.

  • Hemî li ser yek makîneyê. Erê, û bingeh (Airflow bixwe û pêlava me), û serverek malperê, û plansazker, û karker. Û ew jî xebitî. Lê bi demê re, hejmara peywirên ji bo karûbaran mezin bû, û gava PostgreSQL dest pê kir ku di 20 deqeyan de li şûna 5 ms bersivê bide navnîşê, me ew girt û hilda.
  • LocalExecutor. Erê, em hê jî li ser rûniştin, û em berê xwe dane ber qurmê. LocalExecutor heta niha têra me kiriye, lê niha dem hatiye ku em bi kêmanî yek xebatkarê xwe berfireh bikin, û ji bo ku em biçin CeleryExecutor em ê pir bixebitin. Û ji ber vê yekê ku hûn dikarin bi wê re li ser yek makîneyê bixebitin, tiştek we nahêle ku Celery tewra li ser serverek bikar bînin, ku "bê guman, dê çu carî nekeve hilberînê, bi rastî!"
  • Ne-bikaranîna amûrên çêkirî:
    • Girêdanên ji bo hilanîna pêbaweriyên karûbarê,
    • SLA Misses ji bo bersivdana karên ku di wextê xwe de nexebitin,
    • xcom ji bo danûstendina metadata (min got metadata!) di navbera karên dag.
  • Mail abuse. Baş e, ez çi bibêjim? Ji bo hemî dubarekirina karên ketî hişyarî hatin danîn. Naha xebata min a Gmail ji Airflow zêdetirî 90 hezar e-nameyên e-nameyê hene, û mêla e-nameyên webê red dike ku di carekê de zêdetirî 100 e-nameyê hilde û jê bibe.

Zehfên bêtir: Apache Airflow Pitfails

Zêdetir amûrên otomatîkê

Ji bo ku em hê bêtir bi serê xwe ne bi destê xwe bixebitin, Airflow ji me re ev amade kiriye:

  • REST API - Hîn jî statûya wî ya Ezmûnî heye, ku nahêle ku wî bixebite. Bi wê re, hûn ne tenê dikarin di derbarê dag û peywiran de agahdarî bistînin, lê di heman demê de dagek rawestînin/dest pê bikin, DAG Run an hewzek biafirînin.
  • CLI - Gelek amûr bi navgîniya rêzika fermanê ve têne peyda kirin ku ne tenê ji karanîna WebUI-yê nerehet in, lê bi gelemperî tune ne. Bo nimûne:
    • backfill ji bo ji nû ve destpêkirina mînakên peywirê hewce ye.
      Mînak analîst hatin û gotin: “Û hevalno, di daneyên 1-13ê Çileyê de tu bêwateyî heye! Rast bikin, rast bikin, rast bikin, rast bikin!" Û tu hobekî wisa yî:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Xizmeta bingehîn: initdb, resetdb, upgradedb, checkdb.
    • run, ku destûrê dide te ku hûn karekî mînakek bimeşînin, û tewra li ser hemî girêdanan jî bixin. Wekî din, hûn dikarin wê bi rê ve bibin LocalExecutor, hetta ku we komikek Celery heye.
    • Hema hema heman tiştî dike test, tenê di binkeyan de jî tiştek nanivîse.
    • connections destûrê dide afirandina girseya girêdanên ji şêlê.
  • python api - rêgezek têkiliyek zehf hişk, ku ji bo pêvekan tête armanc kirin, û ne bi destên piçûk di nav wê de diheje. Lê kî wê rê li me bigire ku em biçin /home/airflow/dags, bireve ipython û dest bi tevliheviyê bikin? Mînakî, hûn dikarin hemî girêdan bi koda jêrîn derxînin:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Girêdana bi metadanûsa Airflow. Ez nivîsandina wê pêşniyar nakim, lê girtina dewletên peywirê ji bo metrîkên cihêreng ên taybetî dikare ji her yek ji API-yê zûtir û hêsantir be.

    Em bêjin ku ne hemî peywirên me bêhêz in, lê carinan dikarin bikevin, û ev normal e. Lê çend astengî jixwe gumanbar in, û pêdivî ye ku were kontrol kirin.

    Hay ji SQL hebin!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

references

Û bê guman, deh girêdanên yekem ên ji weşandina Google-ê naveroka peldanka Airflow ji nîşangirên min in.

Û girêdanên ku di gotarê de têne bikaranîn:

Source: www.habr.com

Ji bo malperên bi parastina DDoS, serverên VPS VDS mêvandariya pêbawer bikirin 🔥 Hostinga malperê ya pêbawer bi parastina DDoS, serverên VPS VDS bikirin | ProHoster