Llif Awyr Apache: Gwneud ETL yn Haws

Helo, Dmitry Logvinenko ydw i - Peiriannydd Data Adran Ddadansoddeg grŵp cwmnïau Vezet.

Byddaf yn dweud wrthych am offeryn gwych ar gyfer datblygu prosesau ETL - Apache Airflow. Ond mae Airflow mor amlbwrpas ac amlochrog fel y dylech edrych yn agosach arno hyd yn oed os nad ydych yn ymwneud â llif data, ond bod angen lansio unrhyw brosesau o bryd i'w gilydd a monitro eu gweithrediad.

Ac ie, byddaf nid yn unig yn dweud, ond hefyd yn dangos: mae gan y rhaglen lawer o god, sgrinluniau ac argymhellion.

Llif Awyr Apache: Gwneud ETL yn Haws
Beth fyddwch chi'n ei weld fel arfer pan fyddwch chi'n google y gair Airflow / Wikimedia Commons

Tabl cynnwys

Cyflwyniad

Mae Apache Airflow yn union fel Django:

  • wedi ei ysgrifennu yn python
  • mae yna banel gweinyddol gwych,
  • ehangu am gyfnod amhenodol

- dim ond yn well, ac fe'i gwnaed i ddibenion hollol wahanol, sef (fel y mae'n ysgrifenedig cyn y kata):

  • rhedeg a monitro tasgau ar nifer anghyfyngedig o beiriannau (fel y bydd llawer o Seleri / Kubernetes a'ch cydwybod yn caniatáu ichi)
  • gyda chynhyrchu llif gwaith deinamig o hawdd iawn i ysgrifennu a deall cod Python
  • a'r gallu i gysylltu unrhyw gronfeydd data ac APIs â'i gilydd gan ddefnyddio cydrannau parod ac ategion cartref (sy'n hynod o syml).

Rydym yn defnyddio Apache Airflow fel hyn:

  • rydym yn casglu data o wahanol ffynonellau (llawer o achosion SQL Server a PostgreSQL, APIs amrywiol gyda metrigau cais, hyd yn oed 1C) yn DWH ac ODS (mae gennym Vertica a Clickhouse).
  • pa mor ddatblygedig cron, sy'n cychwyn y prosesau cydgrynhoi data ar yr ODS, a hefyd yn monitro eu cynnal.

Tan yn ddiweddar, roedd ein hanghenion yn cael eu cwmpasu gan un gweinydd bach gyda 32 cores a 50 GB o RAM. Mewn Llif Awyr, mae hyn yn gweithio:

  • mwy 200 o dagiau (llifoedd gwaith mewn gwirionedd, lle buom yn stwffio tasgau),
  • ym mhob un ar gyfartaledd 70 o dasgau,
  • mae'r daioni hwn yn dechrau (hefyd ar gyfartaledd) unwaith yr awr.

Ac am sut y gwnaethom ehangu, byddaf yn ysgrifennu isod, ond yn awr gadewch i ni ddiffinio'r über-problem y byddwn yn ei datrys:

Mae yna dri Gweinyddwr SQL gwreiddiol, pob un â 50 o gronfeydd data - enghreifftiau o un prosiect, yn y drefn honno, mae ganddyn nhw'r un strwythur (bron ym mhobman, mua-ha-ha), sy'n golygu bod gan bob un dabl Gorchmynion (yn ffodus, tabl gyda hynny gellir gwthio enw i mewn i unrhyw fusnes). Rydym yn cymryd y data trwy ychwanegu meysydd gwasanaeth (gweinydd ffynhonnell, cronfa ddata ffynhonnell, ID tasg ETL) ac yn eu taflu'n naïf i, dyweder, Vertica.

Gadewch i ni fynd!

Y brif ran, ymarferol (ac ychydig yn ddamcaniaethol)

Pam ydyn ni (a chi)

Pan oedd y coed yn fawr ac roeddwn i'n syml SQL-schik mewn un adwerthu yn Rwsia, fe wnaethon ni sgamio prosesau ETL fel llif data gan ddefnyddio dau offeryn sydd ar gael i ni:

  • Canolfan Bwer Informatica - system hynod o wasgaredig, hynod gynhyrchiol, gyda'i chaledwedd ei hun, ei fersiwn ei hun. Defnyddiais Dduw yn gwahardd 1% o'i alluoedd. Pam? Wel, yn gyntaf oll, mae'r rhyngwyneb hwn, rhywle o'r 380au, yn feddyliol wedi rhoi pwysau arnom ni. Yn ail, mae'r contraption hwn wedi'i gynllunio ar gyfer prosesau hynod ffansi, ailddefnyddio cydrannau ffyrnig a thriciau menter-pwysig iawn eraill. Ynglŷn â'r ffaith ei fod yn costio, fel adain yr Airbus AXNUMX / blwyddyn, ni fyddwn yn dweud unrhyw beth.

    Byddwch yn ofalus, gall sgrinlun brifo pobl o dan 30 ychydig

    Llif Awyr Apache: Gwneud ETL yn Haws

  • SQL Gweinyddwr Integreiddio Gweinydd - defnyddiwyd y cymrawd hwn yn ein llifoedd o fewn y prosiect. Wel, mewn gwirionedd: rydym eisoes yn defnyddio SQL Server, a byddai'n afresymol rhywsut i beidio â defnyddio ei offer ETL. Mae popeth ynddo yn dda: mae'r rhyngwyneb yn brydferth, ac mae'r adroddiadau cynnydd ... Ond nid dyma pam rydyn ni'n caru cynhyrchion meddalwedd, o, nid ar gyfer hyn. Fersiwn iddo dtsx (sef XML gyda nodau siffrwd ar gadw) gallwn, ond beth yw'r pwynt? Beth am wneud pecyn tasg a fydd yn llusgo cannoedd o dablau o un gweinydd i'r llall? Ie, am gant, bydd eich mynegfys yn disgyn oddi ar ugain darn, gan glicio ar fotwm y llygoden. Ond mae'n bendant yn edrych yn fwy ffasiynol:

    Llif Awyr Apache: Gwneud ETL yn Haws

Yn sicr fe wnaethon ni chwilio am ffyrdd allan. Achos hyd yn oed bron daeth i gynhyrchydd pecyn SSIS hunan-ysgrifenedig ...

…ac yna daeth swydd newydd o hyd i mi. A goddiweddodd Apache Airflow fi arno.

Pan wnes i ddarganfod bod disgrifiadau proses ETL yn god Python syml, wnes i ddim dawnsio er llawenydd. Dyma sut y cafodd ffrydiau data eu fersiwnio a'u gwahaniaethu, a daeth arllwys tablau gydag un strwythur o gannoedd o gronfeydd data i mewn i un targed yn fater o god Python mewn un a hanner neu ddwy sgrin 13”.

Cydosod y clwstwr

Gadewch i ni beidio â threfnu kindergarten yn gyfan gwbl, a pheidio â siarad am bethau cwbl amlwg yma, fel gosod Airflow, y gronfa ddata o'ch dewis, Seleri ac achosion eraill a ddisgrifir yn y dociau.

Er mwyn i ni allu dechrau arbrofion ar unwaith, brasluniais docker-compose.yml lle:

  • Gadewch i ni godi mewn gwirionedd Llif aer: Trefnydd, gweinydd gwe. Bydd Flower hefyd yn troelli yno i fonitro tasgau Seleri (gan ei fod eisoes wedi cael ei wthio i mewn apache/airflow:1.10.10-python3.7, ond does dim ots gennym ni)
  • PostgreSQL, lle bydd Airflow yn ysgrifennu ei wybodaeth gwasanaeth (data amserlennydd, ystadegau gweithredu, ac ati), a bydd Seleri yn marcio tasgau gorffenedig;
  • Redis, a fydd yn gweithredu fel brocer tasgau ar gyfer Seleri;
  • Gweithiwr seleri, a fydd yn ymwneud â chyflawni tasgau'n uniongyrchol.
  • I ffolder ./dags byddwn yn ychwanegu ein ffeiliau gyda'r disgrifiad o dags. Byddant yn cael eu codi ar y hedfan, felly nid oes angen jyglo'r pentwr cyfan ar ôl pob tisian.

Mewn rhai mannau, nid yw'r cod yn yr enghreifftiau yn cael ei ddangos yn gyfan gwbl (er mwyn peidio ag annibendod y testun), ond yn rhywle mae'n cael ei addasu yn y broses. Mae enghreifftiau o god gweithio cyflawn i'w gweld yn y gadwrfa https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Примечания:

  • Yng nghynulliad y cyfansoddiad, dibynnais i raddau helaeth ar y ddelwedd adnabyddus pwcl/dociwr-llif aer - gofalwch eich bod yn edrych arno. Efallai nad oes angen unrhyw beth arall yn eich bywyd.
  • Mae pob gosodiad Airflow ar gael nid yn unig drwodd airflow.cfg, ond hefyd trwy newidynnau amgylchedd (diolch i'r datblygwyr), y cymerais fantais yn faleisus.
  • Yn naturiol, nid yw'n barod i gynhyrchu: ni wnes i roi curiadau calon ar gynwysyddion yn fwriadol, ni wnes i drafferthu â diogelwch. Ond fe wnes i'r lleiafswm sy'n addas ar gyfer ein harbrofwyr.
  • Sylwch fod:
    • Rhaid i'r ffolder dag fod yn hygyrch i'r trefnydd a'r gweithwyr.
    • Mae'r un peth yn wir am bob llyfrgell trydydd parti - rhaid eu gosod i gyd ar beiriannau gyda rhaglennydd a gweithwyr.

Wel, nawr mae'n syml:

$ docker-compose up --scale worker=3

Ar ôl i bopeth godi, gallwch edrych ar y rhyngwynebau gwe:

Cysyniadau sylfaenol

Os nad oeddech chi'n deall unrhyw beth yn yr holl “dagiau”, yna dyma eiriadur byr:

  • Scheduler - yr ewythr pwysicaf yn Airflow, sy'n rheoli bod robotiaid yn gweithio'n galed, ac nid person: yn monitro'r amserlen, yn diweddaru dagiau, yn lansio tasgau.

    Yn gyffredinol, mewn fersiynau hŷn, roedd ganddo broblemau gyda'r cof (na, nid amnesia, ond gollyngiadau) ac roedd y paramedr etifeddiaeth hyd yn oed yn aros yn y cyfluniadau run_duration - ei egwyl ailgychwyn. Ond nawr mae popeth yn iawn.

  • DAG (aka "dag") - "graff acyclic cyfeiriedig", ond bydd diffiniad o'r fath yn dweud ychydig o bobl, ond mewn gwirionedd mae'n gynhwysydd ar gyfer tasgau sy'n rhyngweithio â'i gilydd (gweler isod) neu analog o Pecyn yn SSIS a Llif Gwaith yn Informatica .

    Yn ogystal â dagiau, efallai y bydd subdagiau o hyd, ond yn fwyaf tebygol ni fyddwn yn cyrraedd atynt.

  • Rhedeg DAG - dag cychwynnol, sy'n cael ei neilltuo ei hun execution_date. Gall dagrans o'r un dag weithio ochr yn ochr (os ydych chi wedi gwneud eich tasgau'n analluog, wrth gwrs).
  • Gweithredwr yn ddarnau o god sy'n gyfrifol am gyflawni gweithred benodol. Mae tri math o weithredwyr:
    • gweithredufel ein ffefryn PythonOperator, a all weithredu unrhyw god Python (dilys);
    • trosglwyddo, sy'n cludo data o le i le, dyweder, MsSqlToHiveTransfer;
    • synhwyrydd ar y llaw arall, bydd yn caniatáu ichi ymateb neu arafu gweithrediad pellach y dag nes bod digwyddiad yn digwydd. HttpSensor yn gallu tynnu'r pwynt terfyn penodedig, a phan fydd yr ymateb a ddymunir yn aros, dechreuwch y trosglwyddiad GoogleCloudStorageToS3Operator. Bydd meddwl chwilfrydig yn gofyn: “pam? Wedi'r cyfan, gallwch chi ailadrodd yn iawn yn y gweithredwr!" Ac yna, er mwyn peidio â chlocsio'r gronfa o dasgau gyda gweithredwyr ataliedig. Mae'r synhwyrydd yn cychwyn, yn gwirio ac yn marw cyn yr ymgais nesaf.
  • Gorchwyl - mae gweithredwyr datganedig, waeth beth fo'u math, ac sydd ynghlwm wrth y dag yn cael eu dyrchafu i reng y dasg.
  • enghraifft tasg - pan benderfynodd y cynlluniwr cyffredinol ei bod hi'n bryd anfon tasgau i'r frwydr ar y perfformwyr-weithwyr (yn y fan a'r lle, os ydyn ni'n defnyddio LocalExecutor neu i nod pell yn achos CeleryExecutor), mae'n aseinio cyd-destun iddynt (h.y., set o newidynnau - paramedrau gweithredu), yn ehangu templedi gorchymyn neu ymholiad, ac yn eu cyfuno.

Rydym yn cynhyrchu tasgau

Yn gyntaf, gadewch i ni amlinellu cynllun cyffredinol ein doug, ac yna byddwn yn plymio i'r manylion fwyfwy, oherwydd ein bod yn cymhwyso rhai atebion nad ydynt yn ddibwys.

Felly, yn ei ffurf symlaf, bydd dag o'r fath yn edrych fel hyn:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Gadewch i ni ddarganfod:

  • Yn gyntaf, rydym yn mewnforio y libs angenrheidiol a Rhywbeth arall;
  • sql_server_ds - A yw List[namedtuple[str, str]] gydag enwau'r cysylltiadau o Airflow Connections a'r cronfeydd data y byddwn yn cymryd ein plât ohonynt;
  • dag — cyhoeddiad ein doug, y mae yn rhaid fod ynddo o angenrheidrwydd globals(), fel arall ni fydd Airflow yn dod o hyd iddo. Mae angen i Doug ddweud hefyd:
    • beth yw ei enw orders - bydd yr enw hwn wedyn yn ymddangos yn y rhyngwyneb gwe,
    • y bydd yn gweithio o hanner nos ar yr wythfed o Orffennaf,
    • a dylai redeg, tua bob 6 awr (ar gyfer dynion caled yma yn lle timedelta() derbyniadwy cron-llinell 0 0 0/6 ? * * *, ar gyfer y llai oer - mynegiant fel @daily);
  • workflow() yn gwneud y prif waith, ond nid nawr. Am y tro, byddwn yn taflu ein cyd-destun i'r log.
  • Ac yn awr yr hud syml o greu tasgau:
    • rydym yn rhedeg trwy ein ffynonellau;
    • ymgychwyn PythonOperator, a fydd yn gweithredu ein dymi workflow(). Peidiwch ag anghofio nodi enw unigryw (o fewn y dag) y dasg a chlymu'r dag ei ​​hun. Baner provide_context yn ei dro, yn arllwys dadleuon ychwanegol i'r swyddogaeth, y byddwn yn eu casglu'n ofalus gan ddefnyddio **context.

Am y tro, dyna i gyd. Yr hyn a gawsom:

  • dag newydd yn y rhyngwyneb gwe,
  • cant a hanner o dasgau a fydd yn cael eu cyflawni ochr yn ochr (os yw'r gosodiadau Llif Awyr, Seleri a chynhwysedd y gweinydd yn caniatáu hynny).

Wel, bron wedi ei gael.

Llif Awyr Apache: Gwneud ETL yn Haws
Pwy fydd yn gosod y dibyniaethau?

I symleiddio'r holl beth hwn, fe wnes i sgriwio i mewn docker-compose.yml prosesu requirements.txt ar bob nod.

Nawr mae wedi mynd:

Llif Awyr Apache: Gwneud ETL yn Haws

Mae sgwariau llwyd yn enghreifftiau o dasgau a brosesir gan y trefnydd.

Rydym yn aros ychydig, mae'r tasgau'n cael eu bachu gan y gweithwyr:

Llif Awyr Apache: Gwneud ETL yn Haws

Mae'r rhai gwyrdd, wrth gwrs, wedi cwblhau eu gwaith yn llwyddiannus. Nid yw cochion yn llwyddiannus iawn.

Gyda llaw, nid oes ffolder ar ein prod ./dags, nid oes cydamseriad rhwng peiriannau - mae pob dag yn gorwedd i mewn git ar ein Gitlab, ac mae Gitlab CI yn dosbarthu diweddariadau i beiriannau wrth uno master.

Ychydig am Blodau

Tra mae'r gweithwyr yn dyrnu ein heddychwyr, gadewch i ni gofio arf arall a all ddangos rhywbeth i ni - Flower.

Y dudalen gyntaf gyda gwybodaeth gryno am nodau gweithwyr:

Llif Awyr Apache: Gwneud ETL yn Haws

Y dudalen fwyaf dwys gyda thasgau a aeth i'r gwaith:

Llif Awyr Apache: Gwneud ETL yn Haws

Y dudalen fwyaf diflas gyda statws ein brocer:

Llif Awyr Apache: Gwneud ETL yn Haws

Mae'r dudalen ddisgleiriaf gyda graffiau statws tasg a'u hamser gweithredu:

Llif Awyr Apache: Gwneud ETL yn Haws

Rydym yn llwytho y underloaded

Felly, mae'r holl dasgau wedi gweithio allan, gallwch chi gario'r clwyfedig i ffwrdd.

Llif Awyr Apache: Gwneud ETL yn Haws

Ac roedd yna lawer wedi'u clwyfo - am ryw reswm neu'i gilydd. Yn achos y defnydd cywir o Llif Awyr, mae'r union sgwariau hyn yn dangos na chyrhaeddodd y data yn bendant.

Mae angen i chi wylio'r log ac ailgychwyn yr achosion tasg sydd wedi cwympo.

Drwy glicio ar unrhyw sgwâr, byddwn yn gweld y camau gweithredu sydd ar gael i ni:

Llif Awyr Apache: Gwneud ETL yn Haws

Gallwch chi gymryd a gwneud Clirio'r syrthio. Hynny yw, rydym yn anghofio bod rhywbeth wedi methu yno, a bydd yr un dasg enghraifft yn mynd i'r trefnydd.

Llif Awyr Apache: Gwneud ETL yn Haws

Mae'n amlwg nad yw gwneud hyn gyda'r llygoden gyda'r holl sgwariau coch yn drugarog iawn - nid dyma'r hyn yr ydym yn ei ddisgwyl gan Airflow. Yn naturiol, mae gennym arfau dinistr torfol: Browse/Task Instances

Llif Awyr Apache: Gwneud ETL yn Haws

Gadewch i ni ddewis popeth ar unwaith ac ailosod i sero, cliciwch ar yr eitem gywir:

Llif Awyr Apache: Gwneud ETL yn Haws

Ar ôl glanhau, mae ein tacsis yn edrych fel hyn (maen nhw eisoes yn aros i'r trefnydd eu hamserlennu):

Llif Awyr Apache: Gwneud ETL yn Haws

Cysylltiadau, bachau a newidynnau eraill

Mae'n bryd edrych ar y DAG nesaf, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ydy pawb erioed wedi diweddaru adroddiad? Dyma hi eto: mae rhestr o ffynonellau o ble i gael y data; mae rhestr lle i roi; peidiwch ag anghofio honk pan ddigwyddodd neu dorrodd popeth (wel, nid yw hyn yn ymwneud â ni, na).

Gadewch i ni fynd trwy'r ffeil eto ac edrych ar y stwff aneglur newydd:

  • from commons.operators import TelegramBotSendMessage - nid oes dim yn ein hatal rhag gwneud ein gweithredwyr ein hunain, y gwnaethom fanteisio arno trwy wneud papur lapio bach ar gyfer anfon negeseuon i Unblocked. (Byddwn yn siarad mwy am y gweithredwr hwn isod);
  • default_args={} — gall dag ddosbarthu yr un dadleuon i'w holl weithredwyr ;
  • to='{{ var.value.all_the_kings_men }}' - maes to ni fydd gennym god caled, ond wedi'i gynhyrchu'n ddeinamig gan ddefnyddio Jinja a newidyn gyda rhestr o negeseuon e-bost, a roddais yn ofalus i mewn Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — amod ar gyfer cychwyn y gweithredwr. Yn ein hachos ni, dim ond os yw'r holl ddibyniaethau wedi gweithio allan y bydd y llythyr yn hedfan i'r penaethiaid yn llwyddiannus;
  • tg_bot_conn_id='tg_main' - dadleuon conn_id derbyn IDau cysylltiad rydym yn creu ynddynt Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - bydd negeseuon yn Telegram yn hedfan i ffwrdd dim ond os bydd tasgau wedi cwympo;
  • task_concurrency=1 - rydym yn gwahardd lansio sawl tasg o un dasg ar yr un pryd. Fel arall, byddwn yn cael lansiad ar yr un pryd o sawl un VerticaOperator (edrych ar un bwrdd);
  • report_update >> [email, tg] - I gyd VerticaOperator cydgyfeirio wrth anfon llythyrau a negeseuon, fel hyn:
    Llif Awyr Apache: Gwneud ETL yn Haws

    Ond gan fod gan weithredwyr hysbyswyr amodau lansio gwahanol, dim ond un fydd yn gweithio. Yn y Tree View, mae popeth yn edrych ychydig yn llai gweledol:
    Llif Awyr Apache: Gwneud ETL yn Haws

Dywedaf ychydig eiriau am macros a'u ffrindiau - newidynnau.

Mae macros yn ddalfannau Jinja a all amnewid gwybodaeth ddefnyddiol amrywiol yn ddadleuon gweithredwr. Er enghraifft, fel hyn:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} yn ehangu i gynnwys y newidyn cyd-destun execution_date mewn fformat YYYY-MM-DD: 2020-07-14. Y rhan orau yw bod newidynnau cyd-destun yn cael eu hoelio i enghraifft o dasg benodol (sgwâr yn y Tree View), a phan fyddant yn cael eu hailddechrau, bydd y dalfannau yn ehangu i'r un gwerthoedd.

Gellir gweld y gwerthoedd a neilltuwyd gan ddefnyddio'r botwm Rendro ar bob tasg. Dyma sut mae'r dasg o anfon llythyr:

Llif Awyr Apache: Gwneud ETL yn Haws

Ac felly ar y dasg o anfon neges:

Llif Awyr Apache: Gwneud ETL yn Haws

Mae rhestr gyflawn o macros adeiledig ar gyfer y fersiwn ddiweddaraf sydd ar gael ar gael yma: cyfeiriad macros

Ar ben hynny, gyda chymorth ategion, gallwn ddatgan ein macros ein hunain, ond stori arall yw honno.

Yn ogystal â'r pethau rhagddiffiniedig, gallwn amnewid gwerthoedd ein newidynnau (defnyddiais hyn eisoes yn y cod uchod). Gadewch i ni greu i mewn Admin/Variables cwpl o bethau:

Llif Awyr Apache: Gwneud ETL yn Haws

Popeth y gallwch ei ddefnyddio:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Gall y gwerth fod yn sgalar, neu gall fod yn JSON hefyd. Yn achos JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

defnyddiwch y llwybr i'r allwedd a ddymunir: {{ var.json.bot_config.bot.token }}.

Byddaf yn llythrennol yn dweud un gair ac yn dangos un sgrinlun amdano cysylltiadau. Mae popeth yn elfennol yma: ar y dudalen Admin/Connections rydym yn creu cysylltiad, yn ychwanegu ein mewngofnodi / cyfrineiriau a pharamedrau mwy penodol yno. Fel hyn:

Llif Awyr Apache: Gwneud ETL yn Haws

Gellir amgryptio cyfrineiriau (yn fwy trylwyr na'r rhagosodedig), neu gallwch adael y math o gysylltiad allan (fel y gwnes ar gyfer tg_main) - y ffaith yw bod y rhestr o fathau yn galed mewn modelau Airflow ac ni ellir ei ehangu heb fynd i mewn i'r codau ffynhonnell (os yn sydyn ni wnes i google rhywbeth, cywirwch fi), ond ni fydd dim yn ein hatal rhag cael credydau yn unig enw.

Gallwch hefyd wneud sawl cysylltiad gyda'r un enw: yn yr achos hwn, y dull BaseHook.get_connection(), sy'n cael cysylltiadau i ni yn ôl enw, yn rhoi ar hap o nifer o enwau (byddai'n fwy rhesymegol i wneud Rownd Robin, ond gadewch i ni ei adael ar gydwybod y datblygwyr Airflow).

Mae Newidynnau a Chysylltiadau yn sicr yn offer cŵl, ond mae'n bwysig peidio â cholli'r cydbwysedd: pa rannau o'ch llifau rydych chi'n eu storio yn y cod ei hun, a pha rannau rydych chi'n eu rhoi i Airflow i'w storio. Ar y naill law, gall fod yn gyfleus newid y gwerth yn gyflym, er enghraifft, blwch postio, trwy'r UI. Ar y llaw arall, mae hwn yn dal i fod yn dychwelyd i'r clic llygoden, yr oeddem ni (fi) am gael gwared ohono.

Gweithio gyda chysylltiadau yw un o'r tasgau bachau. Yn gyffredinol, mae bachau Llif Awyr yn bwyntiau ar gyfer ei gysylltu â gwasanaethau a llyfrgelloedd trydydd parti. Ee, JiraHook yn agor cleient i ni ryngweithio â Jira (gallwch symud tasgau yn ôl ac ymlaen), a gyda chymorth SambaHook gallwch chi wthio ffeil leol i smb-pwynt.

Dosrannu'r gweithredwr arferiad

Ac fe ddaethon ni'n agos at edrych ar sut mae'n cael ei wneud TelegramBotSendMessage

Cod commons/operators.py gyda'r gweithredwr gwirioneddol:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Yma, fel popeth arall yn Airflow, mae popeth yn syml iawn:

  • Etifeddwyd o BaseOperator, sy'n gweithredu cryn dipyn o bethau llif aer penodol (edrychwch ar eich hamdden)
  • Meysydd a ddatganwyd template_fields, lle bydd Jinja yn chwilio am macros i'w prosesu.
  • Wedi trefnu'r dadleuon cywir o blaid __init__(), gosodwch y rhagosodiadau lle bo angen.
  • Wnaethon ni ddim anghofio am ddechreuad yr hynafiad chwaith.
  • Wedi agor y bachyn cyfatebol TelegramBotHookwedi derbyn gwrthrych cleient ohono.
  • Dull wedi'i ddiystyru (ailddiffinio). BaseOperator.execute(), a fydd Airfow yn twitch pan ddaw'r amser i lansio'r gweithredwr - ynddo byddwn yn gweithredu'r prif weithred, gan anghofio mewngofnodi. (Rydyn ni'n mewngofnodi, gyda llaw, reit i mewn stdout и stderr - Bydd llif aer yn rhyng-gipio popeth, yn ei lapio'n hyfryd, yn ei ddadelfennu lle bo angen.)

Gawn ni weld beth sydd gennym ni commons/hooks.py. Rhan gyntaf y ffeil, gyda'r bachyn ei hun:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Nid wyf hyd yn oed yn gwybod beth i'w esbonio yma, byddaf yn nodi'r pwyntiau pwysig:

  • Rydyn ni'n etifeddu, meddyliwch am y dadleuon - yn y rhan fwyaf o achosion bydd yn un: conn_id;
  • Diystyru dulliau safonol: cyfyngais fy hun get_conn(), yr wyf yn cael y paramedrau cysylltiad yn ôl enw a dim ond yn cael yr adran extra (maes JSON yw hwn), lle rhoddais i (yn ôl fy nghyfarwyddiadau fy hun!) y tocyn Telegram bot: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Rwy'n creu enghraifft o'n TelegramBot, gan roi tocyn penodol iddo.

Dyna i gyd. Gallwch gael cleient o fachyn gan ddefnyddio TelegramBotHook().clent neu TelegramBotHook().get_conn().

Ac ail ran y ffeil, lle rwy'n gwneud microlapiwr ar gyfer y Telegram REST API, er mwyn peidio â llusgo'r un peth python-telegram-bot am un dull sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Y ffordd gywir yw ychwanegu'r cyfan i fyny: TelegramBotSendMessage, TelegramBotHook, TelegramBot - yn yr ategyn, rhowch mewn ystorfa gyhoeddus, a'i roi i Ffynhonnell Agored.

Tra roeddem yn astudio hyn i gyd, llwyddodd ein diweddariadau adroddiad i fethu'n llwyddiannus ac anfon neges gwall ataf yn y sianel. Rydw i'n mynd i wirio i weld a yw'n anghywir ...

Llif Awyr Apache: Gwneud ETL yn Haws
Torrodd rhywbeth yn ein ci! Onid dyna yr oeddem yn ei ddisgwyl? Yn union!

Ydych chi'n mynd i arllwys?

Ydych chi'n teimlo fy mod wedi colli rhywbeth? Mae'n ymddangos ei fod wedi addo trosglwyddo data o SQL Server i Vertica, ac yna fe'i cymerodd a symud oddi ar y pwnc, y scoundrel!

Roedd yr erchyllter hwn yn fwriadol, yn syml iawn roedd yn rhaid imi ddehongli rhywfaint o derminoleg i chi. Nawr gallwch chi fynd ymhellach.

Ein cynllun oedd hyn:

  1. Gwna dag
  2. Cynhyrchu tasgau
  3. Gweld pa mor brydferth yw popeth
  4. Neilltuo rhifau sesiwn i lenwi
  5. Cael data o SQL Server
  6. Rhowch ddata yn Vertica
  7. Casglu ystadegau

Felly, i gael hyn i gyd ar waith, fe wnes i ychwanegiad bach i'n docker-compose.yml:

docwr-cyfansoddi.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Yno rydym yn codi:

  • Vertica fel gwesteiwr dwh gyda'r gosodiadau mwyaf diofyn,
  • tri achos o SQL Server,
  • rydym yn llenwi'r cronfeydd data yn yr olaf gyda rhywfaint o ddata (peidiwch ag ymchwilio i unrhyw achos mssql_init.py!)

Rydyn ni'n lansio'r holl dda gyda chymorth gorchymyn ychydig yn fwy cymhleth na'r tro diwethaf:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Yr hyn a gynhyrchwyd gan ein hapiwr gwyrthiol, gallwch ddefnyddio'r eitem Data Profiling/Ad Hoc Query:

Llif Awyr Apache: Gwneud ETL yn Haws
Y prif beth yw peidio â'i ddangos i ddadansoddwyr

ymhelaethu ar Sesiynau ETL Wn i ddim, mae popeth yn ddibwys yno: rydyn ni'n gwneud sylfaen, mae arwydd ynddo, rydyn ni'n lapio popeth gyda rheolwr cyd-destun, a nawr rydyn ni'n gwneud hyn:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesiwn.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Mae'r amser wedi dod casglu ein data o'n byrddau cant a hanner. Gadewch i ni wneud hyn gyda chymorth llinellau diymhongar iawn:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Gyda chymorth bachyn a gawn o Airflow pymssql-cysylltu
  2. Gadewch i ni amnewid cyfyngiad ar ffurf dyddiad yn y cais - bydd yn cael ei daflu i'r swyddogaeth gan y peiriant templed.
  3. Bwydo ein cais pandaspwy fydd yn ein cael DataFrame - bydd yn ddefnyddiol i ni yn y dyfodol.

Rwy'n defnyddio amnewid {dt} yn lle paramedr cais %s nid oherwydd fy mod yn Pinocchio drwg, ond oherwydd pandas methu trin pymssql ac yn llithro yr un olaf params: Lister ei fod wir eisiau tuple.
Sylwch hefyd fod y datblygwr pymssql penderfynodd beidio â'i gefnogi mwyach, ac mae'n bryd symud allan pyodbc.

Gawn ni weld beth wnaeth Airflow stwffio dadleuon ein swyddogaethau gyda:

Llif Awyr Apache: Gwneud ETL yn Haws

Os nad oes data, yna nid oes diben parhau. Ond rhyfedd hefyd yw ystyried y llenwad yn llwyddiannus. Ond nid camgymeriad yw hyn. A-ah-ah, beth i'w wneud?! A dyma beth:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException yn dweud wrth Airflow nad oes unrhyw wallau, ond rydym yn hepgor y dasg. Ni fydd gan y rhyngwyneb sgwâr gwyrdd neu goch, ond pinc.

Gadewch i ni daflu ein data colofnau lluosog:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Sef

  • Y gronfa ddata y cymerwyd yr archebion ohoni,
  • ID ein sesiwn llifogydd (bydd yn wahanol ar gyfer pob tasg),
  • Stwnsh o'r ID ffynhonnell a threfn - fel bod gennym ni ID archeb unigryw yn y gronfa ddata derfynol (lle mae popeth yn cael ei arllwys i un bwrdd).

Erys y cam olaf ond un: arllwyswch bopeth i Vertica. Ac, yn rhyfedd ddigon, un o'r ffyrdd mwyaf trawiadol ac effeithlon o wneud hyn yw trwy CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Rydym yn gwneud derbynnydd arbennig StringIO.
  2. pandas Bydd garedig rhoi ein DataFrame ar y ffurf CSV-llinellau.
  3. Gadewch i ni agor cysylltiad â'n hoff Vertica gyda bachyn.
  4. Ac yn awr gyda chymorth copy() anfon ein data yn uniongyrchol i Vertika!

Byddwn yn cymryd gan y gyrrwr faint o linellau a lenwyd, ac yn dweud wrth y rheolwr sesiwn bod popeth yn iawn:

session.loaded_rows = cursor.rowcount
session.successful = True

Dyna i gyd.

Ar y gwerthiant, rydym yn creu'r plât targed â llaw. Yma fe wnes i ganiatáu peiriant bach i mi fy hun:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Rwy'n defnyddio VerticaOperator() Rwy'n creu sgema cronfa ddata a thabl (os nad ydynt yn bodoli eisoes, wrth gwrs). Y prif beth yw trefnu'r dibyniaethau yn gywir:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Crynhoi

— Wel, — meddai y llygoden fach, — onid ydyw, yn awr
A ydych yn argyhoeddedig mai fi yw'r anifail mwyaf ofnadwy yn y goedwig?

Julia Donaldson, Y Gryffalo

Rwy'n meddwl pe bai gan fy nghydweithwyr a minnau gystadleuaeth: pwy fydd yn creu ac yn lansio proses ETL yn gyflym o'r dechrau: maen nhw gyda'u SSIS a llygoden a fi gyda Airflow ... Ac yna byddem hefyd yn cymharu rhwyddineb cynnal a chadw ... Waw, dwi'n meddwl y byddwch chi'n cytuno y byddaf yn eu curo ar bob ffrynt!

Os ychydig yn fwy difrifol, yna Apache Airflow - trwy ddisgrifio prosesau ar ffurf cod rhaglen - a wnaeth fy swydd llawer yn fwy cyfforddus a phleserus.

Mae ei estynadwyedd diderfyn, o ran ategion a rhagdueddiad i scalability, yn rhoi'r cyfle i chi ddefnyddio Llif Awyr mewn bron unrhyw faes: hyd yn oed yn y cylch llawn o gasglu, paratoi a phrosesu data, hyd yn oed wrth lansio rocedi (i'r blaned Mawrth, o cwrs).

Rhan derfynol, cyfeirnod a gwybodaeth

Y rhaca rydym wedi casglu i chi

  • start_date. Ydy, mae hwn eisoes yn feme lleol. Trwy brif ddadl doug start_date i gyd yn pasio. Yn fyr, os nodwch yn start_date dyddiad presennol, a schedule_interval - un diwrnod, yna bydd DAG yn dechrau yfory dim cynt.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A dim mwy o broblemau.

    Mae gwall amser rhedeg arall yn gysylltiedig ag ef: Task is missing the start_date parameter, sy'n nodi amlaf eich bod wedi anghofio rhwymo'r gweithredwr dag.

  • I gyd ar un peiriant. Ie, a seiliau (Airflow ei hun a'n cotio), a gweinydd gwe, a scheduler, a gweithwyr. Ac fe weithiodd hyd yn oed. Ond dros amser, tyfodd nifer y tasgau ar gyfer gwasanaethau, a phan ddechreuodd PostgreSQL ymateb i'r mynegai mewn 20 s yn lle 5 ms, fe wnaethom ei gymryd a'i gario i ffwrdd.
  • Gweithredwr Lleol. Ydym, rydym yn dal i eistedd arno, ac rydym eisoes wedi dod i ymyl yr affwys. Mae LocalExecutor wedi bod yn ddigon i ni hyd yn hyn, ond nawr mae'n bryd ehangu gydag o leiaf un gweithiwr, a bydd yn rhaid i ni weithio'n galed i symud i CeleryExecutor. Ac o ystyried y ffaith y gallwch chi weithio gydag ef ar un peiriant, nid oes dim yn eich atal rhag defnyddio Seleri hyd yn oed ar weinydd, na fydd “wrth gwrs, byth yn mynd i mewn i gynhyrchu, a dweud y gwir!”
  • Di-ddefnydd offer adeiledig:
    • Cysylltiadau i storio manylion gwasanaeth,
    • CLG Misses ymateb i dasgau nad oeddent yn gweithio allan ar amser,
    • xcom ar gyfer cyfnewid metadata (dywedais metadata!) rhwng tasgau dag.
  • Camddefnydd post. Wel, beth alla i ddweud? Gosodwyd rhybuddion ar gyfer pob ailadroddiad o dasgau cwympo. Nawr mae gan Gmail fy ngwaith >90k o e-byst gan Airflow, ac mae'r we mail mail yn gwrthod codi a dileu mwy na 100 ar y tro.

Mwy o beryglon: Peryglon Llif Awyr Apache

Mwy o offer awtomeiddio

Er mwyn i ni weithio hyd yn oed yn fwy gyda'n pennau ac nid gyda'n dwylo, mae Airflow wedi paratoi hyn i ni:

  • REST API - mae ganddo statws Arbrofol o hyd, nad yw'n ei atal rhag gweithio. Ag ef, gallwch nid yn unig gael gwybodaeth am dagiau a thasgau, ond hefyd stopio / cychwyn dag, creu DAG Run neu bwll.
  • CLI - mae llawer o offer ar gael trwy'r llinell orchymyn nad ydynt yn anghyfleus yn unig i'w defnyddio trwy'r WebUI, ond sy'n absennol yn gyffredinol. Er enghraifft:
    • backfill angen ailgychwyn achosion tasg.
      Er enghraifft, daeth dadansoddwyr a dweud: “Ac mae gennych chi, gymrawd, nonsens yn y data o Ionawr 1 i 13! Trwsio, trwsio, ei drwsio, ei drwsio!" Ac rydych chi'n gymaint o hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Gwasanaeth sylfaenol: initdb, resetdb, upgradedb, checkdb.
    • run, sy'n eich galluogi i redeg un dasg enghraifft, a hyd yn oed sgorio ar bob dibyniaeth. Ar ben hynny, gallwch chi ei redeg trwy LocalExecutor, hyd yn oed os oes gennych glwstwr Seleri.
    • Yn gwneud yr un peth fwy neu lai test, yn unig hefyd mewn gwaelodion yn ysgrifennu dim.
    • connections caniatáu creu màs o gysylltiadau o'r gragen.
  • API Python - ffordd greiddiol braidd o ryngweithio, a fwriedir ar gyfer ategion, ac nid heidio ynddo gyda dwylo bach. Ond pwy sydd i'n rhwystro ni rhag mynd /home/airflow/dags, rhedeg ipython a dechrau chwarae o gwmpas? Gallwch, er enghraifft, allforio pob cysylltiad â'r cod canlynol:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Cysylltu â chronfa ddata meta-lif Air. Nid wyf yn argymell ysgrifennu ato, ond gall cael cyflyrau tasg ar gyfer amrywiol fetrigau penodol fod yn llawer cyflymach a haws na thrwy unrhyw un o'r APIs.

    Gadewch i ni ddweud nad yw pob un o'n tasgau yn analluog, ond gallant ddisgyn weithiau, ac mae hyn yn normal. Ond mae rhai rhwystrau eisoes yn amheus, a byddai angen gwirio.

    Byddwch yn ofalus SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

cyfeiriadau

Ac wrth gwrs, y deg dolen gyntaf o gyhoeddiad Google yw cynnwys y ffolder Airflow o'm nodau tudalen.

A'r dolenni a ddefnyddir yn yr erthygl:

Ffynhonnell: hab.com