Apache Airflow: Ṣiṣe ETL Rọrun

Bawo, Emi ni Dmitry Logvinenko - Onimọ-ẹrọ data ti Ẹka atupale ti ẹgbẹ Vezet ti awọn ile-iṣẹ.

Emi yoo sọ fun ọ nipa ohun elo iyalẹnu kan fun idagbasoke awọn ilana ETL - Apache Airflow. Ṣugbọn Airflow jẹ wapọ ati multifaceted ti o yẹ ki o wo ni pẹkipẹki paapaa ti o ko ba ni ipa ninu awọn ṣiṣan data, ṣugbọn ni iwulo lati ṣe ifilọlẹ eyikeyi awọn ilana lorekore ati ṣe atẹle ipaniyan wọn.

Ati bẹẹni, Emi kii yoo sọ nikan, ṣugbọn tun fihan: eto naa ni ọpọlọpọ awọn koodu, awọn sikirinisoti ati awọn iṣeduro.

Apache Airflow: Ṣiṣe ETL Rọrun
Ohun ti o maa n rii nigbati o ba google ọrọ Airflow / Wikimedia Commons

Tabili ti awọn akoonu

Ifihan

Apache Airflow dabi Django:

  • ti a kọ sinu Python
  • nronu abojuto nla kan wa,
  • unlimited expandable

- dara nikan, ati ṣe fun awọn idi ti o yatọ patapata, eyun (gẹgẹbi a ti kọ ṣaaju kita):

  • nṣiṣẹ ati abojuto awọn iṣẹ ṣiṣe lori nọmba ailopin ti awọn ẹrọ (bii ọpọlọpọ Seleri / Kubernetes ati ẹri-ọkan rẹ yoo gba ọ laaye)
  • pẹlu iran iṣan-iṣẹ agbara lati rọrun pupọ lati kọ ati loye koodu Python
  • ati agbara lati sopọ eyikeyi awọn apoti isura infomesonu ati awọn API pẹlu ara wọn nipa lilo awọn paati ti a ti ṣetan ati awọn afikun ile (eyiti o rọrun pupọju).

A lo Apache Airflow bii eyi:

  • a gba data lati orisirisi awọn orisun (ọpọlọpọ awọn SQL Server ati PostgreSQL apeere, orisirisi APIs pẹlu ohun elo metiriki, ani 1C) ni DWH ati ODS (a ni Vertica ati Clickhouse).
  • bi o ti ni ilọsiwaju cron, eyiti o bẹrẹ awọn ilana isọdọkan data lori ODS, ati tun ṣe abojuto itọju wọn.

Titi di aipẹ, awọn iwulo wa ni aabo nipasẹ olupin kekere kan pẹlu awọn ohun kohun 32 ati 50 GB ti Ramu. Ni Airflow, eyi ṣiṣẹ:

  • siwaju sii 200 agba (Nitootọ ṣiṣan iṣẹ, ninu eyiti a ṣe awọn iṣẹ ṣiṣe),
  • ni kọọkan lori apapọ 70 awọn iṣẹ-ṣiṣe,
  • Oore yii bẹrẹ (tun ni apapọ) lẹẹkan wakati kan.

Emi yoo kọ nipa bii a ṣe gbooro si isalẹ, ṣugbọn nisisiyi jẹ ki a ṣalaye iṣẹ-ṣiṣe über ti a yoo yanju:

Awọn olupin SQL orisun mẹta wa, ọkọọkan pẹlu awọn apoti isura infomesonu 50 - awọn apẹẹrẹ ti iṣẹ akanṣe kan, ni atele, wọn ni eto kanna (fere nibikibi, mua-ha-ha), eyiti o tumọ si pe ọkọọkan ni tabili Awọn aṣẹ (O daa, tabili pẹlu iyẹn). orukọ le jẹ Titari sinu eyikeyi iṣowo). A gba data naa nipa fifi awọn aaye iṣẹ kun (olupin orisun, ibi ipamọ data orisun, ID iṣẹ-ṣiṣe ETL) ati ni irọra sọ wọn sinu, sọ, Vertica.

Lọ!

Apa akọkọ, ilowo (ati imọ-jinlẹ diẹ)

Kini idi ti awa (ati iwọ)

Nigbati awọn igi tobi ati pe Mo rọrun SQL-schik ninu soobu ara ilu Rọsia kan, a scammed awọn ilana ETL aka ṣiṣan data nipa lilo awọn irinṣẹ meji ti o wa fun wa:

  • Ile-iṣẹ Agbara Informatica - eto ti ntan kaakiri, iṣelọpọ pupọ, pẹlu ohun elo tirẹ, ẹya tirẹ. Bi o ṣe fẹ, Mo lo 1% ti awọn agbara rẹ. Kí nìdí? O dara, ni akọkọ, wiwo yii fi titẹ ẹmi si wa ni ibikan sẹhin ni awọn ọdun 380. Ni ẹẹkeji, ilodisi yii jẹ apẹrẹ fun awọn ilana ti o wuyi pupọ, atunlo paati ibinu ati awọn ẹtan ile-iṣẹ to ṣe pataki pupọ. A kii yoo sọ ohunkohun nipa otitọ pe o jẹ iye to bi apakan Airbus AXNUMX fun ọdun kan.

    Ṣọra, sikirinifoto le ṣe ipalara fun awọn eniyan labẹ ọdun 30 diẹ

    Apache Airflow: Ṣiṣe ETL Rọrun

  • SQL Server Integration Server - a lo ẹlẹgbẹ yii ninu awọn ṣiṣan inu-iṣẹ wa. O dara, ni otitọ: a ti lo SQL Server tẹlẹ, ati pe yoo jẹ aimọgbọnwa bakanna lati ma lo awọn irinṣẹ ETL rẹ. Ohun gbogbo ti o wa ninu rẹ dara: mejeeji ni wiwo jẹ lẹwa, ati awọn ijabọ ilọsiwaju ... Ṣugbọn eyi kii ṣe idi ti a fi fẹran awọn ọja sọfitiwia, oh, kii ṣe fun eyi. Ẹya rẹ dtsx (eyiti o jẹ XML pẹlu awọn apa ti o dapọ nigbati o ba fipamọ) a le, ṣugbọn kini aaye naa? Bawo ni nipa ṣiṣe akojọpọ awọn iṣẹ-ṣiṣe ti yoo fa awọn tabili ọgọrun lati ọdọ olupin kan si ekeji? Kilode, ọgọrun, ogun ninu wọn yoo jẹ ki ika ika rẹ ṣubu lakoko titẹ bọtini Asin naa. Ṣugbọn dajudaju o dabi asiko diẹ sii:

    Apache Airflow: Ṣiṣe ETL Rọrun

Dajudaju a wa awọn ọna jade. Ọran paapaa fere wa si olupilẹṣẹ package SSIS ti ara ẹni…

… ati lẹhinna iṣẹ tuntun kan rii mi. Apache Airflow si bori mi lori rẹ.

Nigbati mo rii pe awọn apejuwe ilana ETL jẹ koodu Python rọrun, Mo kan ko jo fun ayọ. Eyi ni bii awọn ṣiṣan data ṣe jẹ ẹya ati iyatọ, ati sisọ awọn tabili pẹlu eto kan lati awọn ọgọọgọrun awọn data data sinu ibi-afẹde kan di ọrọ ti koodu Python ni ọkan ati idaji tabi meji awọn iboju 13 ”.

Nto iṣupọ

Jẹ ki a ko ṣeto ile-ẹkọ jẹle-osinmi patapata, ki a ma sọrọ nipa awọn nkan ti o han gbangba nibi, bii fifi sori ẹrọ Airflow, ibi ipamọ data ti o yan, Seleri ati awọn ọran miiran ti a ṣalaye ninu awọn docks.

Ki a le lẹsẹkẹsẹ bẹrẹ awọn adanwo, Mo sketched docker-compose.yml ninu eyiti:

  • Jẹ ki a gbe ga gaan Fife ategun: Iṣeto, Webserver. Flower yoo tun ti wa ni nyi nibẹ lati bojuto awọn Seleri awọn iṣẹ-ṣiṣe (nitori o ti tẹlẹ ti ti sinu apache/airflow:1.10.10-python3.7ṣugbọn a ko bikita)
  • PostgreSQL, Ninu eyiti Airflow yoo kọ alaye iṣẹ rẹ (data oluṣeto, awọn iṣiro ipaniyan, ati bẹbẹ lọ), ati Celery yoo samisi awọn iṣẹ ṣiṣe ti pari;
  • Redis, eyi ti yoo ṣiṣẹ bi alagbata iṣẹ-ṣiṣe fun Seleri;
  • Osise seleri, eyi ti yoo wa ni olukoni ni taara ipaniyan ti awọn iṣẹ-ṣiṣe.
  • Si folda ./dags A yoo ṣe akojọpọ awọn faili wa pẹlu awọn apejuwe ti awọn ika. Wọn yoo gbe soke lori fo, nitorina ko si iwulo lati gbe gbogbo akopọ lẹhin igbati kọọkan.

Ni diẹ ninu awọn aaye, koodu ti o wa ninu awọn apẹẹrẹ ko han patapata (ki o má ba ṣe idamu ọrọ naa), ṣugbọn ibikan ni o ṣe atunṣe ninu ilana naa. Awọn apẹẹrẹ koodu iṣẹ pipe ni a le rii ni ibi ipamọ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Awọn akọsilẹ:

  • Ninu apejọ ti akopọ, Mo gbarale pupọ lori aworan ti a mọ daradara puckel / docker-afẹfẹ - rii daju lati ṣayẹwo. Boya o ko nilo ohunkohun miiran ninu aye re.
  • Gbogbo Airflow eto wa ko nipasẹ nikan airflow.cfg, sugbon tun nipasẹ ayika oniyipada (o ṣeun si awọn Difelopa), eyi ti mo ti irira lo anfani ti.
  • Nipa ti, kii ṣe iṣelọpọ-ṣetan: Emi ko mọọmọ ko fi awọn aiya sinu awọn apoti, Emi ko ni wahala pẹlu aabo. Sugbon mo ti ṣe awọn kere dara fun wa experimenters.
  • Ṣe akiyesi pe:
    • Apoti dag gbọdọ wa ni iraye si awọn oluṣeto ati awọn oṣiṣẹ.
    • Kanna kan si gbogbo awọn ile-ikawe ẹni-kẹta - gbogbo wọn gbọdọ fi sori ẹrọ lori awọn ẹrọ pẹlu oluṣeto ati awọn oṣiṣẹ.

O dara, bayi o rọrun:

$ docker-compose up --scale worker=3

Lẹhin ohun gbogbo ti dide, o le wo awọn atọkun wẹẹbu:

Awọn imọran ipilẹ

Ti o ko ba loye ohunkohun ninu gbogbo awọn “dags” wọnyi, lẹhinna eyi ni iwe-itumọ kukuru kan:

  • scheduler - aburo pataki julọ ni Airflow, ẹniti o ṣakoso pe awọn roboti ṣiṣẹ takuntakun, kii ṣe eniyan: ṣe abojuto iṣeto, awọn imudojuiwọn dags, awọn ifilọlẹ awọn iṣẹ ṣiṣe.

    Ni gbogbogbo, ni awọn ẹya agbalagba, o ni awọn iṣoro pẹlu iranti (rara, kii ṣe amnesia, ṣugbọn awọn n jo) ati paramita ohun-ini paapaa wa ninu awọn atunto. run_duration - awọn oniwe-tun aarin. Ṣugbọn nisisiyi ohun gbogbo dara.

  • DAG (aka "dag") - "aworan acyclic ti o darí", ṣugbọn iru itumọ bẹẹ yoo sọ fun eniyan diẹ, ṣugbọn ni otitọ o jẹ eiyan fun awọn iṣẹ ṣiṣe ti o nlo pẹlu ara wọn (wo isalẹ) tabi afọwọṣe ti Package ni SSIS ati Ise-iṣẹ ni Informatica .

    Ni afikun si awọn dags, nibẹ ni o le tun jẹ sabdags, sugbon a julọ seese yoo ko gba si wọn.

  • DAG Run - initialized dag, eyi ti o ti wa ni sọtọ awọn oniwe-ara execution_date. Dagrans ti ọkan dag le ṣiṣẹ ni afiwe (ti o ba jẹ pe, dajudaju, o ti jẹ ki awọn iṣẹ ṣiṣe rẹ lagbara).
  • onišẹ jẹ awọn ege koodu ti o ni iduro fun ṣiṣe iṣe kan pato. Awọn oriṣi mẹta ti awọn oniṣẹ wa:
    • igbesebi ayanfẹ wa PythonOperator, eyi ti o le ṣiṣẹ eyikeyi (wulo) Python koodu;
    • gbigbe, eyiti o gbe data lati ibi de ibi, sọ pe, MsSqlToHiveTransfer;
    • sensọ ni apa keji, yoo gba ọ laaye lati fesi tabi fa fifalẹ ipaniyan siwaju sii ti dag titi iṣẹlẹ yoo fi waye. HttpSensor le fa awọn pàtó kan endpoint, ati nigbati awọn ti o fẹ esi ti wa ni nduro, bẹrẹ awọn gbigbe GoogleCloudStorageToS3Operator. Okan oniwadii yoo beere pe: “Kilode? Lẹhinna, o le ṣe awọn atunwi taara ninu oniṣẹ ẹrọ!” Ati lẹhinna, ni ibere ki o má ba di adagun awọn iṣẹ-ṣiṣe pẹlu awọn oniṣẹ ti daduro. Sensọ bẹrẹ, ṣayẹwo ati ku ṣaaju igbiyanju atẹle.
  • Išẹ - Awọn oniṣẹ ti a kede, laibikita iru, ati ti a so mọ dag naa ni igbega si ipo iṣẹ-ṣiṣe.
  • apẹẹrẹ iṣẹ-ṣiṣe - nigbati oluṣeto gbogbogbo pinnu pe o to akoko lati fi awọn iṣẹ-ṣiṣe ranṣẹ si ogun lori awọn oṣere-iṣẹ (ọtun ni aaye, ti a ba lo. LocalExecutor tabi lati kan latọna ipade ni irú ti CeleryExecutor), o fi aaye kan fun wọn (ie, eto awọn oniyipada - awọn aye ipaniyan), faagun aṣẹ tabi awọn awoṣe ibeere, ati awọn adagun-omi wọn.

A ṣe ipilẹṣẹ awọn iṣẹ-ṣiṣe

Ni akọkọ, jẹ ki a ṣe ilana ilana gbogbogbo ti aja wa, ati lẹhinna a yoo lọ sinu awọn alaye siwaju ati siwaju sii, nitori a lo diẹ ninu awọn solusan ti kii ṣe bintin.

Nitorinaa, ni ọna ti o rọrun julọ, iru dag kan yoo dabi eyi:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Jẹ ki a ro ero rẹ:

  • Ni akọkọ, a gbe wọle awọn libs pataki ati nkan miran;
  • sql_server_ds Ṣe List[namedtuple[str, str]] pẹlu awọn orukọ ti awọn asopọ lati Airflow Connections ati awọn database lati eyi ti a yoo gba wa awo;
  • dag - ikede dag wa, eyiti o gbọdọ wa ni dandan globals(), bibẹkọ ti Airflow yoo ko ri o. Doug tun nilo lati sọ:
    • Kini oruko re orders - orukọ yii yoo han lẹhinna ni wiwo wẹẹbu,
    • pé yóò ṣiṣẹ́ láti ọ̀gànjọ́ òru ọjọ́ kẹjọ, oṣù keje,
    • ati pe o yẹ ki o ṣe ifilọlẹ isunmọ ni gbogbo awọn wakati 6 (fun awọn eniyan tutu, nibi dipo timedelta() iyọọda cron-ila 0 0 0/6 ? * * *, fun awọn kere itura - ẹya ikosile bi @daily);
  • workflow() yoo ṣe akọkọ ise, sugbon ko bayi. Ni bayi, a kan yoo da ọrọ-ọrọ wa silẹ sinu akọọlẹ naa.
  • Ati nisisiyi idan ti o rọrun ti ṣiṣẹda awọn iṣẹ-ṣiṣe:
    • a ṣiṣe nipasẹ awọn orisun wa;
    • ipilẹṣẹ PythonOperator, eyi ti yoo ṣiṣẹ dummy wa workflow(). Maṣe gbagbe lati pato orukọ alailẹgbẹ (laarin dag) ti iṣẹ-ṣiṣe ki o di dag naa funrararẹ. Flag provide_context ni titan, yoo tú awọn ariyanjiyan afikun sinu iṣẹ naa, eyiti a yoo farabalẹ gba ni lilo **context.

Fun bayi, iyẹn ni gbogbo. Ohun ti a ni:

  • dag tuntun ni wiwo wẹẹbu,
  • awọn iṣẹ-ṣiṣe kan ati idaji ti yoo ṣe ni afiwe (ti Airflow, Seleri eto ati agbara olupin gba laaye).

O dara, o fẹrẹ gba.

Apache Airflow: Ṣiṣe ETL Rọrun
Tani yoo fi sori ẹrọ awọn igbẹkẹle naa?

Lati rọrun gbogbo nkan yii, Mo wọ inu docker-compose.yml processing requirements.txt lori gbogbo apa.

Bayi o ti lọ:

Apache Airflow: Ṣiṣe ETL Rọrun

Awọn onigun mẹrin grẹy jẹ awọn iṣẹlẹ iṣẹ ṣiṣe nipasẹ oluṣeto.

A duro diẹ, awọn iṣẹ-ṣiṣe ti wa ni imudani nipasẹ awọn oṣiṣẹ:

Apache Airflow: Ṣiṣe ETL Rọrun

Awọn alawọ ewe, dajudaju, ti pari iṣẹ wọn ni ifijišẹ. Reds ko ni aṣeyọri pupọ.

Nipa ọna, ko si folda lori ọja wa ./dags, ko si amuṣiṣẹpọ laarin awọn ẹrọ - gbogbo awọn dags dubulẹ ni git lori Gitlab wa, ati Gitlab CI n pin awọn imudojuiwọn si awọn ẹrọ nigbati o ba dapọ sinu master.

Diẹ nipa Flower

Lakoko ti awọn oṣiṣẹ n pa awọn pacifiers wa, jẹ ki a ranti irinṣẹ miiran ti o le fi nkan han wa - Flower.

Oju-iwe akọkọ pupọ pẹlu alaye akojọpọ lori awọn apa oṣiṣẹ:

Apache Airflow: Ṣiṣe ETL Rọrun

Oju-iwe ti o lagbara julọ pẹlu awọn iṣẹ ṣiṣe ti o lọ si iṣẹ:

Apache Airflow: Ṣiṣe ETL Rọrun

Oju-iwe alaidun julọ pẹlu ipo ti alagbata wa:

Apache Airflow: Ṣiṣe ETL Rọrun

Oju-iwe didan julọ wa pẹlu awọn aworan ipo iṣẹ-ṣiṣe ati akoko ipaniyan wọn:

Apache Airflow: Ṣiṣe ETL Rọrun

A fifuye awọn underloaded

Nitorinaa, gbogbo awọn iṣẹ ṣiṣe ti ṣiṣẹ, o le gbe awọn ti o gbọgbẹ lọ.

Apache Airflow: Ṣiṣe ETL Rọrun

Ati ọpọlọpọ awọn ti o gbọgbẹ - fun idi kan tabi omiiran. Ninu ọran ti lilo deede ti Airflow, awọn onigun mẹrin wọnyi tọka pe pato data ko de.

O nilo lati wo akọọlẹ naa ki o tun bẹrẹ awọn iṣẹlẹ iṣẹ-ṣiṣe ti o ṣubu.

Nipa tite lori eyikeyi onigun mẹrin, a yoo rii awọn iṣe ti o wa fun wa:

Apache Airflow: Ṣiṣe ETL Rọrun

O le mu ki o ṣe Ko awọn ti o ṣubu kuro. Iyẹn ni, a gbagbe pe nkan kan ti kuna nibẹ, ati pe iṣẹ-ṣiṣe apẹẹrẹ kanna yoo lọ si oluṣeto.

Apache Airflow: Ṣiṣe ETL Rọrun

O han gbangba pe ṣiṣe eyi pẹlu Asin pẹlu gbogbo awọn onigun mẹrin pupa kii ṣe eniyan pupọ - eyi kii ṣe ohun ti a nireti lati Airflow. Nipa ti ara, a ni awọn ohun ija iparun: Browse/Task Instances

Apache Airflow: Ṣiṣe ETL Rọrun

Jẹ ki a yan ohun gbogbo ni ẹẹkan ki a tunto si odo, tẹ ohun kan ti o pe:

Apache Airflow: Ṣiṣe ETL Rọrun

Lẹhin mimọ, awọn takisi wa dabi eyi (wọn ti nduro tẹlẹ fun oluṣeto lati ṣeto wọn):

Apache Airflow: Ṣiṣe ETL Rọrun

Awọn asopọ, awọn iwọ ati awọn oniyipada miiran

O to akoko lati wo DAG atẹle, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Njẹ gbogbo eniyan ti ṣe imudojuiwọn ijabọ kan? Eyi ni lẹẹkansi: atokọ ti awọn orisun wa lati ibiti o ti gba data naa; nibẹ ni a akojọ ibi ti lati fi; maṣe gbagbe lati honk nigbati ohun gbogbo ṣẹlẹ tabi fọ (daradara, eyi kii ṣe nipa wa, rara).

Jẹ ki a lọ nipasẹ faili naa lẹẹkansi ki o wo nkan tuntun ti o ṣofo:

  • from commons.operators import TelegramBotSendMessage - Ko si ohun ti o ṣe idiwọ fun wa lati ṣe awọn oniṣẹ ti ara wa, eyiti a lo anfani nipasẹ ṣiṣe apẹja kekere kan fun fifiranṣẹ awọn ifiranṣẹ si Ṣii silẹ. (A yoo sọrọ diẹ sii nipa oniṣẹ ẹrọ yii ni isalẹ);
  • default_args={} - dag le pin kaakiri awọn ariyanjiyan kanna si gbogbo awọn oniṣẹ rẹ;
  • to='{{ var.value.all_the_kings_men }}' - aaye to a kii yoo ni koodu lile, ṣugbọn ti ipilẹṣẹ ni agbara ni lilo Jinja ati oniyipada kan pẹlu atokọ ti awọn imeeli, eyiti Mo farabalẹ fi sii Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ipo fun ibẹrẹ oniṣẹ. Ninu ọran wa, lẹta naa yoo fo si awọn ọga nikan ti gbogbo awọn igbẹkẹle ba ti ṣiṣẹ ni ifijišẹ;
  • tg_bot_conn_id='tg_main' - awọn ariyanjiyan conn_id gba awọn ID asopọ ti a ṣẹda ninu Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - awọn ifiranṣẹ ni Telegram yoo fo kuro nikan ti awọn iṣẹ ṣiṣe ti o ṣubu ba wa;
  • task_concurrency=1 - a ṣe idiwọ ifilọlẹ igbakana ti ọpọlọpọ awọn iṣẹlẹ iṣẹ-ṣiṣe ti iṣẹ-ṣiṣe kan. Bibẹẹkọ, a yoo gba ọpọlọpọ awọn ifilọlẹ nigbakanna VerticaOperator (nwa ni ọkan tabili);
  • report_update >> [email, tg] - gbogbo VerticaOperator darapọ ni fifiranṣẹ awọn lẹta ati awọn ifiranṣẹ, bii eyi:
    Apache Airflow: Ṣiṣe ETL Rọrun

    Ṣugbọn niwọn igba ti awọn oniṣẹ notifier ni awọn ipo ifilọlẹ oriṣiriṣi, ọkan nikan yoo ṣiṣẹ. Ninu Wiwo Igi, ohun gbogbo dabi wiwo diẹ diẹ:
    Apache Airflow: Ṣiṣe ETL Rọrun

Emi yoo sọ ọrọ diẹ nipa Makiro ati awọn ọrẹ wọn - oniyipada.

Macros jẹ awọn oniduro Jinja ti o le paarọ ọpọlọpọ alaye to wulo sinu awọn ariyanjiyan oniṣẹ. Fun apẹẹrẹ, bii eyi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} yoo faagun si awọn akoonu ti awọn oniyipada ọrọ execution_date ni ọna kika YYYY-MM-DD: 2020-07-14. Apakan ti o dara julọ ni pe awọn oniyipada ọrọ-ọrọ ni a kan mọ si apẹẹrẹ iṣẹ-ṣiṣe kan pato (square kan ninu Iwo Igi), ati pe nigba ti a tun bẹrẹ, awọn aaye yoo faagun si awọn iye kanna.

Awọn iye ti a sọtọ ni a le wo ni lilo bọtini Ti a ṣe lori apẹẹrẹ iṣẹ-ṣiṣe kọọkan. Eyi ni bii iṣẹ ṣiṣe pẹlu fifiranṣẹ lẹta kan:

Apache Airflow: Ṣiṣe ETL Rọrun

Ati bẹ ni iṣẹ-ṣiṣe pẹlu fifiranṣẹ ifiranṣẹ kan:

Apache Airflow: Ṣiṣe ETL Rọrun

Atokọ pipe ti awọn macros ti a ṣe sinu fun ẹya tuntun ti o wa nibi: Macros Reference

Pẹlupẹlu, pẹlu iranlọwọ ti awọn afikun, a le kede awọn macros tiwa, ṣugbọn iyẹn jẹ itan miiran.

Ni afikun si awọn ohun ti a ti sọ tẹlẹ, a le paarọ awọn iye ti awọn oniyipada wa (Mo ti lo eyi tẹlẹ ninu koodu loke). Jẹ ki a ṣẹda Admin/Variables nkan meji:

Apache Airflow: Ṣiṣe ETL Rọrun

Ohun gbogbo ti o le lo:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Iye le jẹ iwọn, tabi o tun le jẹ JSON. Ninu ọran ti JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

kan lo ọna si bọtini ti o fẹ: {{ var.json.bot_config.bot.token }}.

Emi yoo sọ ọrọ kan gangan ati ṣafihan sikirinifoto kan nipa awọn isopọ. Ohun gbogbo jẹ alakọbẹrẹ nibi: lori oju-iwe naa Admin/Connections a ṣẹda asopọ kan, ṣafikun awọn iwọle / awọn ọrọ igbaniwọle wa ati awọn aye pataki diẹ sii nibẹ. Bi eleyi:

Apache Airflow: Ṣiṣe ETL Rọrun

Awọn ọrọ igbaniwọle le jẹ ti paroko (diẹ sii daradara ju aiyipada lọ), tabi o le fi iru asopọ silẹ (bii MO ṣe fun tg_main) - otitọ ni pe atokọ ti awọn oriṣi jẹ lile sinu awọn awoṣe Airflow ati pe ko le ṣe afikun laisi titẹ sinu koodu orisun (ti o ba lojiji Emi ko Google nkankan, jọwọ ṣe atunṣe mi), ṣugbọn ko si ohun ti yoo da wa duro lati gba awọn kirẹditi ni irọrun nipasẹ oruko.

O tun le ṣe awọn asopọ pupọ pẹlu orukọ kanna: ninu ọran yii, ọna naa BaseHook.get_connection(), eyi ti o gba wa awọn isopọ nipa orukọ, yoo fun laileto lati awọn orukọ pupọ (yoo jẹ ọgbọn diẹ sii lati ṣe Round Robin, ṣugbọn jẹ ki a fi silẹ lori ẹri-ọkàn ti awọn olupilẹṣẹ Airflow).

Awọn iyipada ati Awọn isopọ jẹ awọn irinṣẹ itura dajudaju, ṣugbọn o ṣe pataki lati ma padanu iwọntunwọnsi: awọn apakan ti ṣiṣan rẹ ti o fipamọ sinu koodu funrararẹ, ati awọn apakan wo ni o fun Airflow fun ibi ipamọ. Ni ọna kan, o le rọrun lati yi iye pada ni kiakia, fun apẹẹrẹ, apoti ifiweranṣẹ, nipasẹ UI. Ni apa keji, eyi tun jẹ ipadabọ si tẹ Asin, lati eyiti a (I) fẹ lati yọ kuro.

Ṣiṣẹ pẹlu awọn asopọ jẹ ọkan ninu awọn iṣẹ-ṣiṣe ìkọ. Ni gbogbogbo, awọn kio Airflow jẹ awọn aaye fun sisopọ rẹ si awọn iṣẹ ẹnikẹta ati awọn ile-ikawe. Fun apẹẹrẹ, JiraHook yoo ṣii onibara fun a nlo pẹlu Jira (o le gbe awọn iṣẹ-ṣiṣe pada ati siwaju), ati pẹlu iranlọwọ ti awọn SambaHook o le Titari faili agbegbe kan si smb-ojuami.

Ṣiṣayẹwo oniṣẹ ẹrọ aṣa

Ati pe a sunmọ lati wo bi o ti ṣe TelegramBotSendMessage

Koodu commons/operators.py pẹlu oniṣẹ gangan:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Nibi, bii ohun gbogbo miiran ni Airflow, ohun gbogbo rọrun pupọ:

  • Jogun lati BaseOperator, eyiti o ṣe awọn ohun kan pato Airflow diẹ (wo akoko isinmi rẹ)
  • Awọn aaye ti a kede template_fields, ninu eyiti Jinja yoo wa awọn macros lati ṣe ilana.
  • Seto awọn ọtun ariyanjiyan fun __init__(), ṣeto awọn aiyipada ibi ti pataki.
  • A ko gbagbe nipa ibẹrẹ ti baba naa boya.
  • Ṣii kio ti o baamu TelegramBotHook, gba ohun onibara lati ọdọ rẹ.
  • Yipadanu (ti a tunṣe) ọna BaseOperator.execute(), eyiti Airfow yoo tẹẹrẹ nigbati akoko ba de lati ṣe ifilọlẹ oniṣẹ - ninu rẹ a yoo ṣe iṣe iṣe akọkọ, gbagbe lati wọle. (A wọle, nipasẹ ọna, wọle stdout и stderr - Ṣiṣan afẹfẹ yoo ṣe idiwọ ohun gbogbo, fi ipari si ni ẹwa, bajẹ rẹ nibiti o jẹ dandan.)

Jẹ ká wo ohun ti a ni ninu commons/hooks.py. Apa akọkọ ti faili naa, pẹlu kio funrararẹ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Emi ko paapaa mọ kini lati ṣalaye nibi, Emi yoo kan ṣe akiyesi awọn aaye pataki:

  • A jogun, ronu nipa awọn ariyanjiyan - ni ọpọlọpọ awọn ọran yoo jẹ ọkan: conn_id;
  • Yiyọ awọn ọna boṣewa: Mo ni opin ara mi get_conn(), ninu eyiti Mo gba awọn paramita asopọ nipasẹ orukọ ati pe o kan gba apakan naa extra (Eyi jẹ aaye JSON), ninu eyiti MO (gẹgẹbi awọn ilana ti ara mi!) Fi ami ami bot Telegram naa: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Mo ṣẹda apẹẹrẹ ti wa TelegramBot, fifun ni pato aami.

Gbogbo ẹ niyẹn. O le gba alabara kan lati inu kio kan nipa lilo TelegramBotHook().clent tabi TelegramBotHook().get_conn().

Ati apakan keji ti faili naa, ninu eyiti Mo ṣe microwrapper fun Telegram REST API, nitorinaa ki o ma ṣe fa kanna python-telegram-bot nitori ọna kan sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ọna ti o tọ ni lati ṣafikun gbogbo rẹ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ninu ohun itanna, fi sinu ibi ipamọ ti gbogbo eniyan, ki o fun ni Ṣii Orisun.

Lakoko ti a nkọ gbogbo eyi, awọn imudojuiwọn ijabọ wa ṣakoso lati kuna ni aṣeyọri ati firanṣẹ ifiranṣẹ aṣiṣe kan si mi. Emi yoo ṣayẹwo lati rii boya o jẹ aṣiṣe…

Apache Airflow: Ṣiṣe ETL Rọrun
Nkankan bu ni doge wa! Ṣe kii ṣe ohun ti a nireti? Gangan!

Ti wa ni o ti lọ si tú?

Ṣe o lero Mo padanu nkankan? O dabi pe o ṣe ileri lati gbe data lati SQL Server si Vertica, ati lẹhinna o mu ki o lọ kuro ni koko-ọrọ, ẹlẹgàn!

Irufin yii jẹ imomose, Mo kan ni lati pinnu diẹ ninu awọn ọrọ-ọrọ fun ọ. Bayi o le lọ siwaju.

Ètò wa nìyí:

  1. Ṣe bẹ
  2. Ṣe ina awọn iṣẹ-ṣiṣe
  3. Wo bi ohun gbogbo ṣe lẹwa
  4. Fi awọn nọmba igba lati kun
  5. Gba data lati SQL Server
  6. Fi data sinu Vertica
  7. Gba awọn iṣiro

Nitorinaa, lati gba gbogbo eyi ati ṣiṣe, Mo ṣe afikun kekere si wa docker-compose.yml:

docker-kọ.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Nibẹ ni a gbe soke:

  • Vertica bi ogun dwh pẹlu awọn eto aiyipada julọ,
  • Awọn iṣẹlẹ mẹta ti SQL Server,
  • a kun awọn apoti isura infomesonu ni igbehin pẹlu diẹ ninu awọn data (ninu ọran kankan ko wo sinu mssql_init.py!)

A ṣe ifilọlẹ gbogbo awọn ti o dara pẹlu iranlọwọ ti aṣẹ idiju diẹ diẹ sii ju akoko to kẹhin lọ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Kini randomizer iyanu wa ti ipilẹṣẹ, o le lo nkan naa Data Profiling/Ad Hoc Query:

Apache Airflow: Ṣiṣe ETL Rọrun
Ohun akọkọ kii ṣe lati fi han si awọn atunnkanka

ṣe alaye lori Awọn akoko ETL Emi kii yoo ṣe, ohun gbogbo jẹ bintin nibẹ: a ṣe ipilẹ kan, ami kan wa ninu rẹ, a fi ipari si ohun gbogbo pẹlu oluṣakoso ọrọ, ati ni bayi a ṣe eyi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

igba.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Àkókò náà ti dé gba data wa lati wa ọkan ati idaji ọgọrun tabili. Jẹ ki a ṣe eyi pẹlu iranlọwọ ti awọn laini ti ko ni itumọ pupọ:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Pẹlu iranlọwọ ti a kio ti a gba lati Airflow pymssql-sopọ
  2. Jẹ ki a paarọ ihamọ kan ni irisi ọjọ kan sinu ibeere naa - yoo sọ sinu iṣẹ nipasẹ ẹrọ awoṣe.
  3. Ifunni ibeere wa pandastani yio gba wa DataFrame - yoo wulo fun wa ni ojo iwaju.

Mo n lo aropo {dt} dipo ti a ìbéèrè paramita %s ko nitori Mo wa ohun buburu Pinocchio, ṣugbọn nitori pandas ko le mu pymssql ati ki o yo awọn ti o kẹhin params: Listbiotilejepe o gan fe tuple.
Tun akiyesi pe Olùgbéejáde pymssql pinnu lati ma ṣe atilẹyin fun u mọ, ati pe o to akoko lati lọ kuro pyodbc.

Jẹ ki a wo kini Airflow ṣe awọn ariyanjiyan ti awọn iṣẹ wa pẹlu:

Apache Airflow: Ṣiṣe ETL Rọrun

Ti ko ba si data, lẹhinna ko si aaye lati tẹsiwaju. Ṣugbọn o tun jẹ ajeji lati ṣe akiyesi kikun ni aṣeyọri. Ṣugbọn eyi kii ṣe aṣiṣe. A-ah-ah, kini lati ṣe?! Ati pe kini eyi:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException yoo sọ fun Airflow pe ko si awọn aṣiṣe, ṣugbọn a fo iṣẹ naa. Ni wiwo yoo ko ni a alawọ ewe tabi pupa square, ṣugbọn Pink.

Jẹ ki a jabọ data wa ọpọ ọwọn:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Eyi ni:

  • Data data lati eyiti a gba awọn aṣẹ,
  • ID ti igba iṣan omi wa (yoo yatọ fun gbogbo iṣẹ-ṣiṣe),
  • A hash lati orisun ati ID aṣẹ - nitorinaa ni ibi ipamọ data ikẹhin (nibiti ohun gbogbo ti dà sinu tabili kan) a ni ID aṣẹ alailẹgbẹ kan.

Igbesẹ penultimate wa: tú ohun gbogbo sinu Vertica. Ati pe, ni iyalẹnu to, ọkan ninu awọn ọna iyalẹnu julọ ati lilo daradara lati ṣe eyi ni nipasẹ CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. A n ṣe olugba pataki kan StringIO.
  2. pandas yoo fi inurere gbe wa DataFrame bi CSV-ila.
  3. Jẹ ki a ṣii asopọ kan si Vertica ayanfẹ wa pẹlu kio kan.
  4. Ati nisisiyi pẹlu iranlọwọ copy() firanṣẹ data wa taara si Vertika!

A yoo gba lati ọdọ awakọ melo ni awọn ila ti o kun, ati sọ fun oluṣakoso igba pe ohun gbogbo dara:

session.loaded_rows = cursor.rowcount
session.successful = True

Gbogbo ẹ niyẹn.

Lori tita, a ṣẹda apẹrẹ ti a pinnu pẹlu ọwọ. Nibi Mo gba ara mi laaye ẹrọ kekere kan:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Mo nlo VerticaOperator() Mo ṣẹda ipilẹ data ati tabili kan (ti wọn ko ba wa tẹlẹ, dajudaju). Ohun akọkọ ni lati ṣeto awọn ti o gbẹkẹle:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Summing soke

- Daradara, - wi kekere Asin, - ni ko o, bayi
Ṣe o da ọ loju pe Emi ni ẹranko ti o ni ẹru julọ ninu igbo?

Julia Donaldson, The Gruffalo

Mo ro pe ti awọn ẹlẹgbẹ mi ati Emi ni idije kan: tani yoo yara ṣẹda ati ṣe ifilọlẹ ilana ETL kan lati ibere: wọn pẹlu SSIS wọn ati Asin ati mi pẹlu Airflow… Ati lẹhinna a yoo tun ṣe afiwe irọrun itọju… Iro ohun, Mo ro pe o yoo gba pe Emi yoo lu wọn lori gbogbo awọn iwaju!

Ti o ba jẹ diẹ sii ni pataki, lẹhinna Apache Airflow - nipa apejuwe awọn ilana ni irisi koodu eto - ṣe iṣẹ mi pọ diẹ itura ati igbaladun.

Agbara ailopin rẹ, mejeeji ni awọn ofin ti plug-ins ati predisposition si scalability, fun ọ ni aye lati lo Airflow ni fere eyikeyi agbegbe: paapaa ni kikun ọmọ ti gbigba, ngbaradi ati ṣiṣe data, paapaa ni ifilọlẹ awọn apata (si Mars, ti dajudaju).

Apá ipari, itọkasi ati alaye

Awọn àwárí ti a ti gba fun o

  • start_date. Bẹẹni, eyi ti jẹ meme agbegbe tẹlẹ. Nipasẹ Doug ká akọkọ ariyanjiyan start_date gbogbo kọja. Ni soki, ti o ba pato ninu start_date lọwọlọwọ ọjọ, ati schedule_interval - ni ọjọ kan, lẹhinna DAG yoo ṣe ifilọlẹ ko ṣaaju ọla.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ati pe ko si awọn iṣoro diẹ sii.

    Aṣiṣe asiko-ṣiṣe miiran wa ti o ni nkan ṣe pẹlu rẹ: Task is missing the start_date parameter, eyiti o tọka nigbagbogbo pe o gbagbe lati sopọ mọ oniṣẹ ẹrọ dag.

  • Ohun gbogbo lori ẹrọ kan. Bẹẹni, ati awọn ipilẹ (Airflow funrararẹ ati ibora wa), ati olupin wẹẹbu kan, ati oluṣeto, ati awọn oṣiṣẹ. Ati pe o ṣiṣẹ paapaa. Ṣugbọn ni akoko pupọ, nọmba awọn iṣẹ-ṣiṣe fun awọn iṣẹ dagba, ati nigbati PostgreSQL bẹrẹ lati dahun si atọka ni 20 s dipo 5 ms, a mu ati gbe lọ.
  • Aṣẹṣẹ agbegbe. Bẹẹni, a tun joko lori rẹ, ati pe a ti de eti ọgbun naa. LocalExecutor ti to fun wa titi di isisiyi, ṣugbọn nisisiyi o to akoko lati faagun pẹlu o kere ju oṣiṣẹ kan, ati pe a yoo ni lati ṣiṣẹ takuntakun lati gbe lọ si CeleryExecutor. Ati ni wiwo otitọ pe o le ṣiṣẹ pẹlu rẹ lori ẹrọ kan, ko si ohun ti o da ọ duro lati lo Seleri paapaa lori olupin kan, eyiti “dajudaju, kii yoo lọ sinu iṣelọpọ, nitootọ!”
  • Ti kii ṣe lilo -itumọ ti ni irinṣẹ:
    • awọn isopọ lati tọju awọn iwe-ẹri iṣẹ,
    • SLA padanu lati dahun si awọn iṣẹ ṣiṣe ti ko pari ni akoko,
    • xcom fun paṣipaarọ metadata (Mo sọ metadata!) laarin awọn iṣẹ-ṣiṣe dag.
  • mail abuse. O dara, kini MO le sọ? Awọn itaniji ti ṣeto fun gbogbo awọn atunwi ti awọn iṣẹ ṣiṣe ti o ṣubu. Bayi Gmail iṣẹ mi ni> awọn imeeli 90k lati Airflow, ati muzzle mail wẹẹbu kọ lati gbe ati paarẹ diẹ sii ju 100 ni akoko kan.

Awọn ipalara diẹ sii: Apache Airflow Pitfails

Awọn irinṣẹ adaṣe adaṣe diẹ sii

Ni ibere fun wa lati ṣiṣẹ paapaa pẹlu awọn ori wa kii ṣe pẹlu ọwọ wa, Airflow ti pese sile fun wa:

  • REST API — o tun ni ipo idanwo, eyiti ko ṣe idiwọ rẹ lati ṣiṣẹ. Pẹlu rẹ, o ko le gba alaye nikan nipa awọn dags ati awọn iṣẹ-ṣiṣe, ṣugbọn tun da / bẹrẹ dag kan, ṣẹda DAG Run tabi adagun kan.
  • CLI - ọpọlọpọ awọn irinṣẹ wa nipasẹ laini aṣẹ ti kii ṣe inira lati lo nipasẹ WebUI, ṣugbọn ko si ni gbogbogbo. Fun apere:
    • backfill nilo lati tun awọn iṣẹlẹ iṣẹ-ṣiṣe bẹrẹ.
      Fun apẹẹrẹ, awọn atunnkanka wa o sọ pe: “Ati iwọ, ẹlẹgbẹ, ni ọrọ isọkusọ ninu data lati Oṣu Kini ọjọ 1 si 13! Ṣe atunṣe, ṣe atunṣe, ṣe atunṣe, tun ṣe!" Ati pe iwọ jẹ alarinrin bẹ:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Iṣẹ ipilẹ: initdb, resetdb, upgradedb, checkdb.
    • run, eyiti o fun ọ laaye lati ṣiṣe iṣẹ-ṣiṣe apẹẹrẹ kan, ati paapaa Dimegilio lori gbogbo awọn igbẹkẹle. Pẹlupẹlu, o le ṣiṣe nipasẹ LocalExecutor, paapaa ti o ba ni iṣupọ Seleri.
    • Ṣe lẹwa Elo ohun kanna test, nikan tun ni awọn ipilẹ kọ ohunkohun.
    • connections faye gba ibi-ẹda awọn isopọ lati ikarahun.
  • API Python - ọna kuku ogbontarigi ti ibaraenisepo, eyiti o jẹ ipinnu fun awọn afikun, ati pe ko swaring ninu rẹ pẹlu awọn ọwọ kekere. Ṣugbọn tani yoo da wa duro lati lọ si /home/airflow/dags, sure ipython ki o si bẹrẹ idotin ni ayika? O le, fun apẹẹrẹ, okeere gbogbo awọn asopọ pẹlu koodu atẹle:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Nsopọ si metadatabase Airflow. Emi ko ṣeduro kikọ si rẹ, ṣugbọn gbigba awọn ipinlẹ iṣẹ-ṣiṣe fun ọpọlọpọ awọn metiriki pato le jẹ yiyara pupọ ati rọrun ju nipasẹ eyikeyi awọn API.

    Jẹ ki a sọ pe kii ṣe gbogbo awọn iṣẹ-ṣiṣe wa ni agbara, ṣugbọn wọn le ṣubu nigbakan, ati pe eyi jẹ deede. Ṣugbọn awọn idena diẹ ti wa tẹlẹ ifura, ati pe yoo jẹ pataki lati ṣayẹwo.

    Ṣọra SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

jo

Ati pe dajudaju, awọn ọna asopọ mẹwa akọkọ lati ipinfunni Google jẹ awọn akoonu ti folda Airflow lati awọn bukumaaki mi.

Ati awọn ọna asopọ ti o wa ninu nkan naa:

orisun: www.habr.com