Apache Airflow: Ua ETL yooj yim dua

Nyob zoo, Kuv yog Dmitry Logvinenko - Cov Ntaub Ntawv Kws Ua Haujlwm ntawm lub tuam txhab tshuaj ntsuam xyuas ntawm Vezet pawg tuam txhab.

Kuv mam li qhia koj txog cov cuab yeej zoo rau kev tsim cov txheej txheem ETL - Apache Airflow. Tab sis Airflow muaj ntau yam thiab ntau yam uas koj yuav tsum tau saib xyuas kom zoo txawm tias koj tsis koom nrog cov ntaub ntawv ntws, tab sis muaj qhov xav tau los ua ntu zus txhua cov txheej txheem thiab saib xyuas lawv cov kev ua tiav.

Thiab yog, kuv yuav tsis tsuas qhia koj, tab sis kuj qhia koj: qhov kev zov me nyuam muaj ntau tus lej, screenshots thiab cov lus pom zoo.

Apache Airflow: Ua ETL yooj yim dua
Koj feem ntau pom dab tsi thaum koj Google lo lus Airflow / Wikimedia Commons

Cov txheej txheem

Taw qhia

Apache Airflow zoo li Django:

  • sau rau hauv Python,
  • muaj ib tug zoo admin,
  • unlimitedly expandable

- Tsuas yog zoo dua, thiab ua rau lub hom phiaj sib txawv kiag li, uas yog (raws li sau ua ntej kata):

  • tso tawm thiab saib xyuas cov haujlwm ntawm cov tshuab tsis txwv (raws li ntau Celery / Kubernetes thiab koj lub siab yuav tso cai rau koj)
  • nrog dynamic workflow tiam los ntawm heev yooj yim-rau-sau thiab nkag siab Python code
  • thiab muaj peev xwm los txuas ib qho databases thiab APIs nrog rau ib leeg siv ob qho tib si npaj txhij thiab cov plugins hauv tsev (uas yog ua tau yooj yim heev).

Peb siv Apache Airflow zoo li no:

  • peb sau cov ntaub ntawv los ntawm ntau qhov chaw (ntau qhov SQL Server thiab PostgreSQL piv txwv, ntau yam APIs nrog daim ntawv thov kev ntsuas, txawm tias 1C) hauv DWH thiab ODS (rau peb nws yog Vertica thiab Clickhouse).
  • raws li advanced cron, uas khiav cov txheej txheem sib sau ua ke ntawm ODS thiab tseem saib xyuas lawv cov kev saib xyuas.

Txog thaum tsis ntev los no, peb cov kev xav tau tau them los ntawm ib lub server me nrog 32 cores thiab 50 GB ntawm RAM. Hauv Airflow qhov no ua haujlwm:

  • ntau 200 noj (Qhov tseeb ua haujlwm uas peb tau ua tiav hauv cov haujlwm),
  • hauv txhua qhov nruab nrab 70 cov haujlwm,
  • cov khoom no pib (kuj nyob nruab nrab) ib zaug ib teev.

Kuv yuav sau txog yuav ua li cas peb nthuav dav hauv qab no, tab sis tam sim no cia peb txhais cov haujlwm ΓΌber uas peb yuav daws tau:

Muaj peb qhov chaw SQL Servers, txhua tus muaj 50 databases - piv txwv ntawm ib qhov project, feem, lawv cov qauv yog tib yam (yuav luag txhua qhov chaw, muah-ha-ha), uas txhais tau hais tias txhua tus muaj lub rooj Orders (zoo hmoo, koj tuaj yeem muaj rooj nrog lub npe ntawd thawb rau hauv ib qho kev lag luam). Peb muab cov ntaub ntawv los ntawm kev ntxiv cov chaw pabcuam (qhov chaw server, qhov chaw database, ETL tus cim ua haujlwm) ​​thiab naively pov rau hauv, hais, Vertica.

Cia peb mus!

Ib feem yog yooj yim, tswv yim (thiab me ntsis theoretical)

Vim li cas peb xav tau nws (thiab koj)

Thaum cov ntoo loj thiab kuv yooj yim SQLRaws li tus thawj coj hauv ib lub khw muag khoom hauv Lavxias, peb tau ntsuas ETL cov txheej txheem aka cov ntaub ntawv ntws siv ob lub cuab yeej muaj rau peb:

  • Informatica Power Center - Cov txheej txheem ntau yam, ua tau zoo heev, nrog nws tus kheej kho vajtse, nws tus kheej versioning. Vajtswv txaus siab, kuv siv 1% ntawm nws lub peev xwm. Vim li cas? Zoo, ua ntej, qhov kev sib txuas no ua rau muaj kev puas siab puas ntsws rau peb qhov chaw rov qab rau xyoo 380s. Qhov thib ob, qhov khoom no yog tsim los rau cov txheej txheem uas tsis tshua muaj siab, npau taws rov siv cov khoom thiab lwm yam tseem ceeb heev-kev lag luam nta. Peb yuav tsis hais dab tsi txog qhov tseeb tias nws raug nqi ntau npaum li Airbus AXNUMX tis ib xyoo.

    Ua tib zoo saib, lub screenshot yuav ua mob rau cov neeg qis dua 30 me ntsis

    Apache Airflow: Ua ETL yooj yim dua

  • SQL Server Integration Server - Peb siv tus txiv neej no hauv peb qhov project ntws. Zoo, qhov tseeb: peb twb siv SQL Server, thiab nws yuav yog qhov tsis tsim nyog tsis siv nws cov cuab yeej ETL. Txhua yam hais txog nws yog qhov zoo: lub interface yog qhov zoo nkauj, thiab cov ntawv tshaj tawm ... Tab sis tsis yog vim li cas peb nyiam cov khoom siv software, huag, tsis yog rau qhov ntawd. Version nws dtsx (uas yog XML nrog cov nodes uas sib xyaw thaum khaws cia) peb tuaj yeem ua tau, tab sis qhov taw tes yog dab tsi? Yuav ua li cas txog kev ua ib pob ntawm cov dej num uas yuav luag ib puas lub rooj los ntawm ib lub server mus rau lwm tus? Vim li cas, ib puas, nees nkaum ntawm lawv yuav ua rau koj tus ntiv tes xoo poob thaum nias lub pob nas. Tab sis nws twv yuav raug hu zoo dua fashionable:

    Apache Airflow: Ua ETL yooj yim dua

Peb yeej nrhiav tau txoj kev tawm. Nws txawm yuav luag nws tuaj rau tus kheej sau SSIS pob generator ...

... thiab tom qab ntawd txoj hauj lwm tshiab pom kuv. Thiab ntawm nws Apache Airflow hla kuv.

Thaum kuv kawm tias ETL cov txheej txheem piav qhia tsuas yog Python code nkaus xwb, Kuv tsis tuaj yeem ua ntau dua dancing rau kev xyiv fab. Qhov no yog li cas cov ntaub ntawv ntws tau raug rau versioning thiab diffing, thiab nchuav cov ntxhuav nrog ib tus qauv los ntawm ntau pua cov ntaub ntawv rau hauv ib lub hom phiaj los ua qhov teeb meem ntawm Python code hauv ib thiab ib nrab rau ob 13 "cov ntxaij vab tshaus.

Sib sau ua ke ib pawg

Cia peb tsis txhob ua kom tiav qib kindergarten thiab tsis tham txog txhua yam pom tseeb ntawm no, xws li txhim kho Airflow, koj xaiv database, Celery thiab lwm yam uas tau piav qhia hauv cov ntaub ntawv.

Yog li ntawd peb tuaj yeem pib sim tam sim ntawd, kuv tau kos tawm docker-compose.yml uas:

  • Cia peb tsa qhov tseeb airflow: Scheduler, Webserver. Paj kuj tseem yuav khiav mus rau qhov ntawd rau kev saib xyuas Celery cov haujlwm (vim tias nws twb raug thawb mus rau hauv apache/airflow:1.10.10-python3.7, thiab peb tsis mloog);
  • PostgreSQL, nyob rau hauv uas Airflow yuav sau nws cov ntaub ntawv kev pabcuam (cov ntaub ntawv teem sijhawm, kev ua tiav cov txheeb cais, thiab lwm yam), thiab Celery yuav kos cov haujlwm ua tiav;
  • Redis, leej twg yuav ua haujlwm ua haujlwm rau Celery;
  • Cov neeg ua haujlwm Celery, leej twg yuav ncaj qha ua cov dej num.
  • Mus rau folder ./dags Peb yuav muab tso ua ke peb cov ntaub ntawv nrog cov lus piav qhia ntawm dugs. Lawv yuav raug khaws ntawm ya, yog li tsis tas yuav txav tag nrho pawg tom qab txhua qhov txham.

Hauv qee qhov chaw cov cai hauv cov piv txwv tsis muab tag nrho (kom tsis txhob cuam tshuam cov ntawv nyeem), thiab hauv qee qhov chaw nws raug hloov kho hauv cov txheej txheem. Ua kom tiav cov lej ua piv txwv tuaj yeem pom hauv qhov chaw cia khoom https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Sau ntawv:

  • Hauv kev sib sau ua ke, kuv vam khom rau cov duab zoo puckel/docker-airflow - Nco ntsoov xyuas nws tawm. Tej zaum koj yuav tsis xav tau lwm yam hauv lub neej.
  • Tag nrho Airflow chaw muaj tsis tau tsuas yog los ntawm airflow.cfg, tab sis kuj los ntawm ib puag ncig hloov pauv (lub koob meej rau cov neeg tsim khoom), uas kuv tau ua phem rau kom zoo dua.
  • Lawm, nws tsis yog ntau lawm-npaj: Kuv txhob txwm tsis txhob muab lub plawv dhia ntawm lub ntim thiab tsis thab nrog kev ruaj ntseg. Tab sis kuv tau ua qhov tsawg kawg nkaus uas tsim nyog rau peb cov neeg sim.
  • Nco ntsoov tias:
    • Lub nplaub tshev nrog dags yuav tsum nkag mus rau ob tus neeg teem sijhawm thiab cov neeg ua haujlwm.
    • Tib yam siv rau txhua lub tsev qiv ntawv thib peb - ​​lawv txhua tus yuav tsum tau nruab rau ntawm cov cav tov nrog tus teem sijhawm thiab cov neeg ua haujlwm.

Zoo, tam sim no nws yooj yim:

$ docker-compose up --scale worker=3

Tom qab txhua yam tiav lawm, koj tuaj yeem saib lub vev xaib interfaces:

Cov ntsiab lus tseem ceeb

Yog tias koj tsis nkag siab dab tsi hauv tag nrho cov "dags", ces ntawm no yog cov lus luv luv:

  • Teem sijhawm - tus txiv neej tseem ceeb tshaj plaws hauv Airflow, uas ua kom cov neeg hlau ua haujlwm hnyav, thiab tsis yog tib neeg: nws saib xyuas lub sijhawm, hloov kho cov ntaub ntawv, ua haujlwm.

    Feem ntau, nyob rau hauv cov laus versions, nws muaj teeb meem nrog lub cim xeeb (tsis muaj, tsis amnesia, tab sis leaks) thiab muaj txawm tias ib tug legacy parameter nyob rau hauv configs. run_duration - nws lub sijhawm rov pib dua. Tab sis tam sim no txhua yam zoo.

  • DAG (aka "dag") yog "cov lus qhia acyclic graph", tab sis cov ntsiab lus zoo li no yuav txhais tau me ntsis rau leej twg, tab sis qhov tseeb nws yog lub thawv rau kev ua haujlwm sib cuam tshuam nrog ib leeg (saib hauv qab) lossis ib qho piv txwv ntawm Pob hauv SSIS thiab Workflow hauv Informatica.

    Ntxiv nrog rau dags, kuj tseem muaj sabdags, tab sis peb feem ntau yuav tsis tau txais lawv.

  • DAG Run - ib tug pib dag, uas yog muab nws tus kheej execution_date. Dagrans ntawm ib lub dag tuaj yeem ua haujlwm sib luag (yog tias, ntawm chav kawm, koj tau ua rau koj cov dej num tsis muaj zog).
  • Tus neeg teb xov tooj - Cov no yog cov lej ntawm lub luag haujlwm rau kev ua haujlwm tshwj xeeb. Muaj peb hom neeg ua haujlwm:
    • txiav txim, zoo li peb tus hlub PythonOperator, uas tuaj yeem ua tiav ib qho ( siv tau) Python code;
    • hloov, uas thauj cov ntaub ntawv los ntawm qhov chaw mus rau qhov chaw, hais MsSqlToHiveTransfer;
    • sensor Nws tseem yuav tso cai rau koj los cuam tshuam lossis ua kom qeeb ntxiv ntawm dag ua ntej qhov tshwm sim tshwm sim. HttpSensor tuaj yeem rub cov ntsiab lus kawg, thiab thaum tau txais cov lus teb xav tau, pib hloov mus GoogleCloudStorageToS3Operator. Ib tug inquisitive lub siab yuav nug: β€œVim li cas? Tom qab tag nrho, koj tuaj yeem ua rov ua dua txoj cai hauv tus neeg teb xov tooj! " Thiab tom qab ntawd, kom tsis txhob txhaws lub pas dej ua haujlwm nrog cov neeg ua haujlwm daig. Lub sensor pib, sim thiab tuag kom txog rau thaum lub sim tom ntej.
  • Ua haujlwm - Cov neeg ua haujlwm tshaj tawm, tsis hais hom twg, thiab txuas nrog lub dag tau nce mus rau qib ntawm txoj haujlwm.
  • Ua haujlwm piv txwv - Thaum tus thawj tswj hwm tau txiav txim siab tias nws yog lub sijhawm xa cov dej num mus rau hauv kev sib ntaus sib tua tawm tsam cov neeg ua yeeb yam (txoj cai ntawm qhov chaw, yog tias peb siv LocalExecutor los yog nyob rau hauv tej thaj chaw deb ntawm cov ntaub ntawv CeleryExecutor), nws muab lawv cov ntsiab lus (piv txwv li, cov txheej txheem ntawm kev hloov pauv - kev ua tsis tiav), nthuav cov lus txib lossis thov cov qauv thiab muab tso rau hauv lub pas dej.

Tsim cov haujlwm

Ua ntej, cia peb piav qhia txog cov txheej txheem dav dav ntawm peb lub dag, thiab tom qab ntawd peb yuav dhia dej ntau ntxiv rau hauv cov ntsiab lus, vim tias peb siv qee qhov kev daws teeb meem tsis tseem ceeb.

Yog li, hauv nws daim ntawv yooj yim, xws li dag yuav zoo li no:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Cia peb xav txog nws:

  • Ua ntej, import cov libs xav tau thiab lwm yam;
  • sql_server_ds Yog List[namedtuple[str, str]] nrog rau cov npe ntawm kev sib txuas ntawm Airflow Connections thiab cov ntaub ntawv los ntawm peb yuav coj peb lub phaj;
  • dag - ib daim ntawv tshaj tawm los ntawm peb dag, uas yuav tsum tau nyob rau hauv globals(), txwv tsis pub Airflow yuav tsis pom nws. Doug kuj yuav tsum hais tias:
    • nws lub npe hu li cas orders - Lub npe no yuav tshwm sim hauv lub vev xaib interface,
    • hais tias nws yuav ua hauj lwm pib thaum ib tag hmo lub Xya hli ntuj XNUMXth,
    • thiab nws yuav tsum tau tso tawm kwv yees li ntawm 6 teev (rau cov neeg txias, ntawm no xwb timedelta() nkag tau cron- kab 0 0 0/6 ? * * *, rau qhov tsawg txias - ib qho kev qhia zoo li @daily);
  • workflow() yuav ua haujlwm tseem ceeb, tab sis tsis yog tam sim no. Tam sim no peb yuav tsuas muab peb cov ntsiab lus teb rau hauv lub cav.
  • Thiab tam sim no cov khawv koob yooj yim ntawm kev tsim cov haujlwm:
    • Cia peb mus los ntawm peb qhov chaw;
    • pib PythonOperator, uas yuav ua peb dummy workflow(). Tsis txhob hnov ​​​​qab qhia qhov tshwj xeeb (hauv lub dag) lub npe ntawm txoj haujlwm thiab txuas lub dag nws tus kheej. Chij provide_context nyob rau hauv lem, yuav ncuav ntxiv sib cav rau hauv lub functionality, uas peb yuav ua tib zoo sau siv **context.

Qhov ntawd yog tag nrho rau tam sim no. Qhov peb tau txais:

  • tshiab dag hauv web interface,
  • ib thiab ib nrab ib puas yam hauj lwm uas yuav tsum tau ua nyob rau hauv parallel (yog hais tias lub Airflow, Celery thiab server fais fab chaw tso cai rau nws).

Zoo, peb yuav luag tau txais nws.

Apache Airflow: Ua ETL yooj yim dua
Leej twg yuav nruab qhov kev vam khom?

Yuav kom yooj yim rau tag nrho cov teeb meem no, kuv muab tso rau hauv docker-compose.yml ua requirements.txt ntawm tag nrho cov nodes.

Tam sim no peb mus:

Apache Airflow: Ua ETL yooj yim dua

Grey squares yog cov haujlwm ua haujlwm ua tiav los ntawm tus teem sijhawm.

Peb tos me ntsis, cov dej num tau ua los ntawm cov neeg ua haujlwm:

Apache Airflow: Ua ETL yooj yim dua

Cov xim ntsuab, tau kawg, tau ua haujlwm tiav. Reds tsis muaj kev vam meej.

Los ntawm txoj kev, tsis muaj ntawv tais ceev tseg ntawm peb cov khoom ./dags, tsis muaj synchronization ntawm cov tshuab - tag nrho cov ntaub ntawv nyob rau hauv git ntawm peb Gitlab, thiab Gitlab CI faib cov hloov tshiab rau cov tshuab thaum sib koom ua ke master.

Me ntsis txog Paj

Thaum cov neeg ua haujlwm sib tsoo peb cov dummy shuffles, cia peb nco ntsoov txog lwm lub cuab yeej uas tuaj yeem qhia peb ib yam dab tsi - Paj.

Thawj nplooj ntawv nrog cov ntsiab lus ntawm cov neeg ua haujlwm nodes:

Apache Airflow: Ua ETL yooj yim dua

Nplooj siab tshaj plaws nrog cov haujlwm xa mus ua haujlwm:

Apache Airflow: Ua ETL yooj yim dua

Nplooj ntawv tho txawv tshaj plaws nrog cov xwm txheej ntawm peb tus broker:

Apache Airflow: Ua ETL yooj yim dua

Cov nplooj ntawv zoo tshaj plaws yog nrog cov duab ntawm lub xeev cov haujlwm thiab lawv lub sijhawm ua tiav:

Apache Airflow: Ua ETL yooj yim dua

Peb reload lub underloaded

Yog li, tag nrho cov haujlwm tau ua tiav, cov neeg raug mob tuaj yeem nqa mus.

Apache Airflow: Ua ETL yooj yim dua

Thiab muaj ob peb tus neeg raug mob - rau ib qho laj thawj lossis lwm qhov. Yog tias Airflow siv raug, cov xwm txheej tib yam qhia tau tias cov ntaub ntawv tsis tuaj txog.

Koj yuav tsum tau saib lub cav thiab rov pib ua haujlwm poob.

Los ntawm txhaj rau ib qho square, peb yuav pom cov yeeb yam muaj rau peb:

Apache Airflow: Ua ETL yooj yim dua

Koj tuaj yeem nqa nws thiab ua kom Clear rau tus poob. Ntawd yog, peb tsis nco qab tias ib yam dab tsi tau poob rau hauv qhov ntawd, thiab tib yam haujlwm yuav mus rau tus teem sijhawm.

Apache Airflow: Ua ETL yooj yim dua

Nws yog qhov tseeb tias ua qhov no nrog nas nrog tag nrho cov squares liab tsis yog tib neeg - qhov no tsis yog qhov peb xav tau los ntawm Airflow. Lawm, peb muaj riam phom ntawm kev puas tsuaj loj: Browse/Task Instances

Apache Airflow: Ua ETL yooj yim dua

Cia peb xaiv txhua yam ib zaug thiab rov pib dua rau xoom, nyem qhov khoom raug:

Apache Airflow: Ua ETL yooj yim dua

Tom qab ntxuav, peb cov tsheb tavxij zoo li no (lawv tsis tuaj yeem tos lub sijhawm teem sijhawm rau lawv):

Apache Airflow: Ua ETL yooj yim dua

Kev sib txuas, hooks thiab lwm yam hloov pauv

Nws yog lub sijhawm los saib DAG tom ntej, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Txhua tus tau hloov kho lawv cov lus ceeb toom, puas yog? Ntawm no nws yog dua: muaj ib daim ntawv teev cov peev txheej uas tau txais cov ntaub ntawv; muaj ib daim ntawv teev qhov twg muab tso rau; tsis txhob hnov ​​qab hnia thaum txhua yam tshwm sim los yog tawg (zoo, qhov no tsis yog hais txog peb, tsis yog).

Cia peb rov qab mus rau hauv cov ntaub ntawv dua thiab saib qhov txawv txawv tshiab:

  • from commons.operators import TelegramBotSendMessage - tsis muaj dab tsi tiv thaiv peb los ntawm kev tsim peb tus kheej cov neeg ua haujlwm, uas peb tau siv kom zoo dua los ntawm kev ua ib lub hnab me me rau kev xa cov lus rau Unblocked. (Peb yuav tham ntxiv txog tus neeg teb xov tooj hauv qab no);
  • default_args={} - dag tuaj yeem faib cov lus sib cav rau txhua tus neeg ua haujlwm;
  • to='{{ var.value.all_the_kings_men }}' - teb to peb li yuav tsis hardcoded, tab sis generated dynamically siv Jinja thiab ib tug kuj sib txawv nrog ib daim ntawv teev cov emails, uas kuv ua tib zoo muab tso rau hauv. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - tus neeg teb xov tooj tso tus mob. Hauv peb qhov xwm txheej, tsab ntawv yuav raug xa mus rau cov thawj coj tsuas yog tias txhua qhov kev cia siab tau ua tiav ntse;
  • tg_bot_conn_id='tg_main' - lus sib cav conn_id lees txais cov cim ntawm cov kev sib txuas uas peb tsim hauv Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Cov lus hauv Telegram yuav ya mus tsuas yog tias muaj cov haujlwm poob;
  • task_concurrency=1 - Peb txwv tsis pub tso tawm ib txhij ntawm ntau qhov haujlwm ntawm ib txoj haujlwm. Txwv tsis pub, peb yuav tau txais ntau qhov kev tso tawm ib txhij VerticaOperator (saib ib lub rooj);
  • report_update >> [email, tg] - tag nrho VerticaOperator yuav pom zoo xa cov ntawv thiab cov lus, zoo li no:
    Apache Airflow: Ua ETL yooj yim dua

    Tab sis txij li cov neeg ua haujlwm ceeb toom muaj cov xwm txheej sib txawv, tsuas yog ib qho yuav ua haujlwm. Hauv Tsob Ntoo Saib txhua yam zoo li me ntsis tsis meej:
    Apache Airflow: Ua ETL yooj yim dua

Kuv mam li hais ob peb lo lus hais txog macro thiab lawv cov phooj ywg - sib txawv.

Macros yog Jinja qhov chaw uas tuaj yeem ntxig ntau cov ntaub ntawv tseem ceeb rau hauv cov lus sib cav. Piv txwv li, zoo li no:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} yuav nthuav mus rau cov ntsiab lus ntawm cov ntsiab lus sib txawv execution_date nyob rau hauv hom ntawv YYYY-MM-DD: 2020-07-14. Qhov zoo tshaj plaws yog tias cov ntsiab lus hloov pauv tau raug ntsia rau ib qho haujlwm tshwj xeeb (ib lub xwmfab hauv Tsob Ntoo Saib), thiab thaum rov pib dua cov chaw tuav yuav nthuav mus rau tib qhov txiaj ntsig.

Cov txiaj ntsig tau muab tuaj yeem saib tau siv lub khawm Rendered ntawm txhua qhov haujlwm ua haujlwm. Nov yog li cas txoj haujlwm xa ntawv zoo li:

Apache Airflow: Ua ETL yooj yim dua

Thiab yog li ntawd rau txoj hauj lwm nrog kev xa lus:

Apache Airflow: Ua ETL yooj yim dua

Daim ntawv teev tag nrho ntawm cov macro built-in rau qhov tseeb version muaj nyob ntawm no: Macros Reference

Ntxiv mus, nrog kev pab los ntawm plugins, peb tuaj yeem tshaj tawm peb tus kheej macros, tab sis qhov ntawd yog ib zaj dab neeg sib txawv kiag li.

Ntxiv rau cov khoom ua ntej, peb tuaj yeem hloov qhov tseem ceeb ntawm peb cov kev hloov pauv (Kuv twb tau siv qhov no saum toj no hauv cov cai). Wb tsim nyob rau hauv Admin/Variables ob peb daim:

Apache Airflow: Ua ETL yooj yim dua

Qhov ntawd yog nws, koj tuaj yeem siv:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Tus nqi tuaj yeem yog scalar, lossis nws tuaj yeem muaj JSON. Nyob rau hauv rooj plaub ntawm JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

tsuas yog siv txoj hauv kev mus rau tus yuam sij xav tau: {{ var.json.bot_config.bot.token }}.

Kuv mam li hais ib lo lus thiab qhia ib lub screenshot txog kev sib txuas. Txhua yam yog theem pib ntawm no: ntawm nplooj ntawv Admin/Connections Peb tsim kev sib txuas, ntxiv peb tus ID nkag mus / lo lus zais thiab ntau qhov tshwj xeeb muaj nyob ntawd. Zoo li no:

Apache Airflow: Ua ETL yooj yim dua

Cov passwords tuaj yeem raug encrypted (ua tib zoo ntau dua li qhov kev xaiv ua ntej), lossis koj tsis tuaj yeem hais qhia hom kev sib txuas (raws li kuv tau ua rau tg_main) - qhov tseeb yog tias cov npe ntawm hom yog hardwired rau hauv Airflow qauv thiab tsis tuaj yeem nthuav dav yam tsis tau nkag mus rau hauv qhov chaws (yog tias tam sim ntawd kuv tsis Google ib yam dab tsi, thov kho kuv), tab sis tsis muaj dab tsi yuav txwv tsis pub peb tsis tau txais cov qhab nia yooj yim los ntawm npe.

Koj tuaj yeem ua ntau yam kev sib txuas nrog tib lub npe: hauv qhov no, txoj kev BaseHook.get_connection(), uas tau txais peb kev sib txuas los ntawm lub npe, yuav muab random los ntawm ob peb lub npe (nws yuav yog qhov xav tau ntau dua los ua Round Robin, tab sis peb yuav tso qhov ntawd rau lub siab ntawm Airflow developers).

Kev sib txawv thiab kev sib txuas yog cov cuab yeej txias, tab sis nws yog ib qho tseem ceeb kom tsis txhob poob qhov sib npaug ntawm qhov chaw ntawm koj cov dej ntws koj khaws cia hauv cov lej thiab qhov chaw koj muab rau Airflow rau kev cia. Ntawm qhov tod tes, hloov pauv sai sai, piv txwv li, mailbox, tuaj yeem yooj yim los ntawm UI. Ntawm qhov tod tes, qhov no tseem yog qhov rov qab mus rau nas nyem, uas peb (Kuv) xav kom tshem tawm.

Ua haujlwm nrog kev sib txuas yog ib txoj haujlwm nuv. Feem ntau, Airflow hooks yog cov ntsiab lus rau kev txuas nws mus rau cov kev pabcuam thib peb thiab cov tsev qiv ntawv. Piv txwv li, JiraHook yuav qhib tus neeg siv khoom rau peb los cuam tshuam nrog Jira (koj tuaj yeem txav cov haujlwm rov qab mus), thiab nrog kev pab SambaHook koj tuaj yeem thawb cov ntaub ntawv hauv zos rau smb- taw tes.

Cia peb txheeb xyuas tus neeg ua haujlwm kev cai

Thiab peb los ze saib seb nws tau ua li cas TelegramBotSendMessage

code commons/operators.py nrog tus neeg teb xov tooj nws tus kheej:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ntawm no, zoo li txhua yam hauv Airflow, txhua yam yooj yim heev:

  • Tau txais los ntawm BaseOperator, uas siv ntau yam ntawm Airflow cov khoom tshwj xeeb (saib nws ntawm koj qhov kev lom zem)
  • Teb tau tshaj tawm template_fields, nyob rau hauv uas Jinja yuav nrhiav macros rau txheej txheem.
  • Npaj cov lus sib cav zoo rau __init__(), tso lub defaults qhov tsim nyog.
  • Lawv kuj tsis hnov ​​qab txog kev pib ua poj koob yawm txwv.
  • Qhib qhov sib txuas TelegramBotHook, tau txais ib qho khoom siv los ntawm nws.
  • Overridden (redefined) txoj kev BaseOperator.execute(), uas Airfow yuav twitch thaum lub sij hawm los txog rau tso tus neeg teb xov tooj - nyob rau hauv nws peb siv lub ntsiab ua, tsis txhob hnov ​​qab mus rau hauv. (Los ntawm txoj kev, peb nkag mus ncaj qha rau stdout ΠΈ stderr - Airflow yuav cuam tshuam txhua yam, qhwv nws zoo nkauj, thiab muab tso rau qhov twg nws xav tau.)

Cia peb saib seb peb muaj dab tsi hauv commons/hooks.py. Thawj feem ntawm cov ntaub ntawv, nrog tus nuv nws tus kheej:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Kuv tsis paub dab tsi tuaj yeem piav qhia ntawm no, kuv tsuas yog sau cov ntsiab lus tseem ceeb:

  • Peb tau txais txiaj ntsig, xav txog cov lus sib cav - feem ntau yuav muaj ib qho: conn_id;
  • Overriding txheem txheej txheem: Kuv txwv kuv tus kheej get_conn(), nyob rau hauv uas kuv tau txais kev twb kev txuas parameters los ntawm lub npe thiab cia li tau txais cov seem extra (qhov no yog daim teb rau JSON), uas kuv (raws li kuv tus kheej cov lus qhia!) muab lub Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Kuv tsim ib qho piv txwv ntawm peb TelegramBot, muab nws ib lub cim tshwj xeeb.

Yog tag nrho. Koj tuaj yeem tau txais ib tus neeg siv khoom los ntawm kev sib tw TelegramBotHook().clent los yog TelegramBotHook().get_conn().

Thiab qhov thib ob ntawm cov ntaub ntawv, uas kuv ua micro-wrapper rau Telegram REST API, yog li tsis txhob rub tib yam. python-telegram-bot rau lub hom phiaj ntawm ib txoj kev sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Txoj kev raug yog ntxiv nws tag nrho: TelegramBotSendMessage, TelegramBotHook, TelegramBot - rau hauv ib lub plugin, muab tso rau hauv ib lub chaw khaws ntaub ntawv pej xeem, thiab muab rau Open Source.

Thaum peb tab tom kawm tag nrho cov no, peb cov ntawv tshaj tawm tshiab tau tswj tsis tau tiav thiab xa cov lus yuam kev rau kuv channel. Kuv mam mus xyuas seb qhov twg tsis zoo ntxiv lawm...

Apache Airflow: Ua ETL yooj yim dua
Ib yam dab tsi tawg hauv peb dag! Qhov no tsis yog qhov peb tau tos? Raws nraim!

Koj puas yuav nchuav nws?

Koj puas xav tias kuv nco ib yam dab tsi? Zoo li nws tau cog lus tias yuav hloov cov ntaub ntawv los ntawm SQL Server mus rau Vertica, thiab tom qab ntawd nws coj nws thiab tawm ntawm lub ncauj lus, koj tus neeg phem!

Qhov kev ua txhaum cai no yog txhob txwm ua, kuv tsuas yog yuav tsum tau txiav txim siab qee cov ntsiab lus rau koj. Tam sim no koj tuaj yeem txav mus ntxiv.

Peb lub hom phiaj yog qhov no:

  1. Ua dag
  2. Tsim cov haujlwm
  3. Saib seb txhua yam zoo nkauj npaum li cas
  4. Muab cov lej sib tham los sau
  5. Tau txais cov ntaub ntawv los ntawm SQL Server
  6. Muab cov ntaub ntawv tso rau hauv Vertica
  7. Sau cov txheeb cais

Yog li, kom tau txais txhua qhov no mus, Kuv tau ntxiv me ntsis ntxiv rau peb docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Nyob ntawd peb tsa:

  • Vertica ua tus tswv dwh nrog rau feem ntau default settings,
  • peb piv txwv ntawm SQL Server,
  • peb sau cov databases tom kawg nrog qee cov ntaub ntawv (tsis muaj xwm txheej saib mus rau hauv mssql_init.py!)

Peb tso txhua yam uas siv cov lus txib nyuaj me ntsis dua li lub sijhawm dhau los:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Dab tsi peb qhov txuj ci tseem ceeb randomizer tsim tuaj yeem ua tiav siv cov khoom Data Profiling/Ad Hoc Query:

Apache Airflow: Ua ETL yooj yim dua
Qhov tseem ceeb tshaj plaws yog tsis qhia nws rau cov kws tshuaj ntsuam

Nyob hauv kev nthuav dav ntawm Cov rooj sib tham ETL Kuv yuav tsis, txhua yam yog qhov tsis tseem ceeb: peb tsim cov ntaub ntawv, lub rooj hauv nws, qhwv txhua yam nrog tus saib xyuas cov ntsiab lus, thiab tam sim no peb ua qhov no:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Lub sij hawm los txog coj peb cov ntaub ntawv los ntawm peb ib thiab ib nrab pua rooj. Cia peb ua qhov no nrog cov kab yooj yim heev:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Siv ib tug nuv peb tau los ntawm Airflow pymssql- txuas
  2. Cia peb ntxig ib qho kev txwv nyob rau hauv daim ntawv ntawm hnub rau hauv qhov kev thov - lub cav template yuav muab pov rau hauv lub luag haujlwm.
  3. Pub peb thov pandasleej twg yuav tau peb DataFrame - Nws yuav muaj txiaj ntsig rau peb yav tom ntej.

Kuv siv kev hloov pauv {dt} es tsis txhob thov parameter %s Tsis yog vim kuv yog tus phem Pinocchio, tab sis vim pandas tiv tsis tau pymssql thiab hla nws mus rau qhov kawg params: List, txawm tias nws xav tau tiag tiag tuple.
Kuj nco ntsoov tias tus tsim tawm pymssql txiav txim siab tsis txhawb nws ntxiv lawm, thiab nws yog lub sijhawm txav tawm pyodbc.

Cia peb pom dab tsi Airflow ntim rau hauv cov lus sib cav ntawm peb cov haujlwm:

Apache Airflow: Ua ETL yooj yim dua

Yog tias tsis muaj cov ntaub ntawv, ces tsis muaj qhov taw qhia txuas ntxiv. Tab sis nws tseem coj txawv txawv los xav txog qhov ua tiav tiav. Tab sis qhov no tsis yog qhov yuam kev. Ah-ah-ah, yuav ua li cas?! Nov yog dab tsi:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow yuav qhia rau koj tias yeej tsis muaj qhov yuam kev, tab sis peb tab tom hla txoj haujlwm. Lub interface yuav tsis muaj ntsuab lossis liab square, tab sis yuav muaj xim liab.

Cia peb pub peb cov ntaub ntawv ob peb kab:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Namely:

  • Lub database uas peb tau xaj,
  • ID ntawm peb lub sijhawm upload (nws yuav txawv rau txhua txoj haujlwm),
  • Hash los ntawm lub hauv paus thiab kev txiav txim tus cim - yog li ntawd nyob rau hauv lub kawg database (qhov twg txhua yam yog poured rau hauv ib lub rooj) peb muaj ib tug tshwj xeeb kev txiav txim identifier.

Cov kauj ruam kawg tseem nyob: ncuav txhua yam rau hauv Vertica. Thiab, oddly txaus, ib txoj hauv kev zoo tshaj plaws thiab ua tau zoo los ua qhov no yog los ntawm CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Peb tab tom ua lub chaw kaw neeg tshwj xeeb StringIO.
  2. pandas yuav ua siab zoo muab tso rau hauv peb DataFrame hauv daim ntawv CSV- kab.
  3. Cia peb qhib kev sib txuas rau peb tus hlub Vertica siv tus nuv.
  4. Thiab tam sim no nrog kev pab copy() Cia peb xa peb cov ntaub ntawv ncaj qha rau Vertika!

Peb yuav coj los ntawm tus neeg tsav tsheb yuav ua li cas muaj pes tsawg kab tau sau rau hauv thiab qhia tus neeg saib xyuas kev sib kho tias txhua yam zoo:

session.loaded_rows = cursor.rowcount
session.successful = True

Yog txhua yam.

Hauv kev tsim khoom, peb tsim lub hom phiaj phaj manually. Ntawm no kuv tso cai rau kuv tus kheej ib lub tshuab me me:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Kuv siv VerticaOperator() Kuv tsim ib tug database schema thiab ib lub rooj (yog tias lawv tsis muaj, tau kawg). Qhov tseem ceeb tshaj plaws yog npaj cov kev vam meej kom raug:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Summing txog

"Zoo," tus nas hais tias, "tsis muaj tseeb tam sim no?"
Koj puas ntseeg tias kuv yog tus tsiaj uas txaus ntshai tshaj plaws hauv hav zoov?

Julia Donaldson, "The Gruffalo"

Kuv xav tias yog kuv cov npoj yaig thiab kuv tau muaj kev sib tw: leej twg yuav yog tus ceev tshaj plaws los tsim thiab tsim cov txheej txheem ETL los ntawm kos: lawv nrog lawv SSIS thiab nas thiab kuv nrog Airflow... Thiab ces peb kuj yuav piv tau yooj yim ntawm kev saib xyuas.. Wow, Kuv xav tias koj yuav pom zoo tias kuv yuav hla lawv ntawm txhua sab!

Ntawm qhov kev ceeb toom loj me ntsis, Apache Airflow - los ntawm kev piav qhia txog cov txheej txheem hauv daim ntawv teev npe ntawm qhov program code - tau ua kuv txoj haujlwm. ntau yooj yim dua thiab qab ntxiag.

Nws unlimited extensibility: ob qho tib si nyob rau hauv cov nqe lus ntawm plugins thiab predisposition rau scalability - muab lub sij hawm rau koj siv Airflow nyob rau hauv yuav luag txhua qhov chaw: txawm nyob rau hauv tag nrho lub voj voog ntawm kev sau, npaj thiab ua cov ntaub ntawv, txawm nyob rau hauv launching foob pob ua ntxaij (rau Mars, ntawm chav kawm) .

Qhov kawg, siv thiab cov ntaub ntawv

Lub rake peb sau rau koj

  • start_date. Yog lawm, qhov no yog ib qho meme hauv zos. Los ntawm Dag lub ntsiab lus sib cav start_date sawv daws hla. Luv luv, yog tias koj qhia hauv start_date hnub tim tam sim no, thiab nyob rau schedule_interval - muaj ib hnub, ces DAG yuav tawm tsis ntxov tshaj tag kis.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Thiab tsis muaj teeb meem ntxiv.

    Lwm qhov kev ua yuam kev raug cuam tshuam nrog nws: Task is missing the start_date parameter, uas feem ntau qhia tias koj tsis nco qab khi dag rau tus neeg teb xov tooj.

  • Txhua yam ntawm ib lub tshuab. Yog, thiab databases (Airflow nws tus kheej thiab peb txheej), thiab lub web server, thiab tus teem sijhawm, thiab cov neeg ua haujlwm. Thiab nws txawm ua haujlwm. Tab sis dhau sij hawm, tus naj npawb ntawm cov hauj lwm rau cov kev pab cuam loj hlob, thiab thaum PostgreSQL pib teb rau qhov Performance index nyob rau hauv 20 s es tsis txhob ntawm 5 ms, peb coj nws thiab nqa nws mus.
  • LocalExecutor. Yog, peb tseem zaum ntawm nws, peb twb tuaj txog ntawm lub abyss. LocalExecutor tau txaus rau peb txog tam sim no, tab sis tam sim no nws yog lub sijhawm los nthuav nrog tsawg kawg ib tus neeg ua haujlwm, thiab peb yuav tau ua haujlwm hnyav dua kom txav mus rau CeleryExecutor. Thiab txij li thaum koj tuaj yeem ua haujlwm nrog nws ntawm ib lub tshuab, tsis muaj dab tsi txwv koj los ntawm kev siv Celery txawm nyob rau ntawm lub server, uas "ib txwm yuav tsis mus rau hauv kev tsim khoom, ncaj ncees!"
  • Tsis siv cov cuab yeej built-in:
    • kev sib txuas khaws cov ntaub ntawv pov thawj kev pabcuam,
    • SLA Nco teb rau cov haujlwm uas tsis tau ua tiav raws sijhawm,
    • XCom pauv metadata (Kuv hais metadata!) nruab nrab ntawm Dag txoj haujlwm.
  • Kev tsim txom ntawm kev xa ntawv. Zoo kuv tuaj yeem hais li cas? Kev ceeb toom tau teeb tsa rau txhua qhov rov ua dua ntawm cov haujlwm poob. Tam sim no hauv kuv txoj haujlwm Gmail muaj> 90k tsab ntawv los ntawm Airflow, thiab lub vev xaib xa ntawv tsis kam lees thiab rho tawm ntau dua 100 daim ib zaug.

Ntau qhov pitfalls: Apache Airflow Pitfails

Txhais tau tias txawm ntau dua automation

Txhawm rau kom peb ua haujlwm ntau dua nrog peb lub taub hau thiab tsis nrog peb txhais tes, Airflow tau npaj qhov no rau peb:

  • QIV API - nws tseem muaj cov xwm txheej sim, uas tsis tiv thaiv nws los ntawm kev ua haujlwm. Nrog nws cov kev pab, koj tuaj yeem tsis tsuas yog tau txais cov ntaub ntawv hais txog dags thiab cov dej num, tab sis kuj nres / pib dag, tsim DAG Run lossis pas dej ua ke.
  • CLI - Ntau yam cuab yeej muaj nyob rau ntawm kab hais kom ua uas tsis tsuas yog tsis yooj yim rau siv ntawm WebUI, tab sis tsis muaj kiag li. Piv txwv li:
    • backfill yuav tsum tau rov pib ua hauj lwm piv txwv.
      Piv txwv li, cov kws tshuaj ntsuam tuaj thiab hais tias: "Thiab koj cov ntaub ntawv, phooj ywg, tsis muaj tseeb txij li Lub Ib Hlis 1 txog Lub Ib Hlis 13! Kho nws, kho nws, kho nws, kho nws! Thiab koj yog tus zoo li no:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Kev saib xyuas hauv paus: initdb, resetdb, upgradedb, checkdb.
    • run, uas tso cai rau koj los tsim ib qho haujlwm ua piv txwv, thiab txawm tsis nco qab txog txhua qhov kev vam meej. Ntxiv mus, koj tuaj yeem khiav nws ntawm LocalExecutorTxawm hais tias koj muaj Celery pawg.
    • Ua roughly tib yam test, tab sis nws tsis sau dab tsi rau hauv database.
    • connections tso cai rau koj los tsim kev sib txuas hauv ntau los ntawm lub plhaub.
  • Nab hab sej API - ib txoj kev nyuaj nyuaj ntawm kev sib cuam tshuam, uas yog npaj rau plugins, thiab tsis tinkering nrog koj txhais tes. Tab sis leej twg yuav txwv tsis pub peb mus /home/airflow/dags, khiav ipython thiab pib messing ib ncig? Koj tuaj yeem, piv txwv li, xa tawm tag nrho cov kev sib txuas siv cov cai no:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Txuas rau Airflow metadata database. Kuv tsis pom zoo kom sau ntawv rau nws, tab sis koj tuaj yeem tau txais cov haujlwm ua haujlwm rau ntau yam kev ntsuas tshwj xeeb sai dua thiab yooj yim dua los ntawm ib qho ntawm APIs.

    Cia peb hais tias tsis yog txhua yam ntawm peb txoj haujlwm tsis muaj zog, tab sis qee zaum lawv tuaj yeem ua tsis tau thiab qhov no yog qhov qub. Tab sis ob peb lub pob zeb twb tsis txaus ntseeg, thiab peb yuav tsum tau kuaj xyuas nws.

    Ceev faj, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

ua tim khawv

Zoo, tau kawg, thawj kaum qhov txuas los ntawm Google yog cov ntsiab lus ntawm Airflow nplaub tshev los ntawm kuv phau ntawv cim.

Thiab cov kev sib txuas koom nrog hauv kab lus:

Tau qhov twg los: www.hab.com