Apache Airflow: A 'dèanamh ETL nas fhasa

Hi, is mise Dmitry Logvinenko - Einnseanair Dàta aig Roinn Analytics buidheann chompanaidhean Vezet.

Innsidh mi dhut mu inneal mìorbhaileach airson pròiseasan ETL a leasachadh - Apache Airflow. Ach tha Airflow cho ioma-chruthach agus ioma-thaobhach gum bu chòir dhut sùil nas mionaidiche a thoirt air eadhon ged nach eil thu an sàs ann an sruthan dàta, ach gu bheil feum agad air pròiseasan sam bith a chuir air bhog bho àm gu àm agus sùil a chumail air an coileanadh.

Agus tha, chan e a-mhàin innsidh mi, ach cuideachd seallaidh mi: tha tòrr còd, seallaidhean-sgrìn agus molaidhean aig a’ phrògram.

Apache Airflow: A 'dèanamh ETL nas fhasa
Na chì thu mar as trice nuair a nì thu google am facal Airflow / Wikimedia Commons

Clàr-innse

Ro-ràdh

Tha Apache Airflow dìreach mar Django:

  • sgrìobhte ann am python
  • tha pannal rianachd math ann,
  • a’ leudachadh gun chrìoch

- a-mhàin nas fheàrr, agus chaidh a dhèanamh airson adhbharan gu tur eadar-dhealaichte, is e sin (mar a tha e sgrìobhte ron kat):

  • a’ ruith agus a’ cumail sùil air gnìomhan air àireamh neo-chuingealaichte de dh’ innealan (mar a bheir mòran de Celery / Kubernetes agus do chogais dhut)
  • le gineadh sruth-obrach fiùghantach bho chòd Python a tha gu math furasta a sgrìobhadh agus a thuigsinn
  • agus an comas stòran-dàta agus APIan sam bith a cheangal ri chèile a’ cleachdadh an dà chuid co-phàirtean deiseil agus plugins dèanta aig an taigh (a tha gu math sìmplidh).

Bidh sinn a’ cleachdadh Apache Airflow mar seo:

  • bidh sinn a’ cruinneachadh dàta bho dhiofar thobraichean (mòran eisimpleirean SQL Server agus PostgreSQL, diofar APIan le metrics tagraidh, eadhon 1C) ann an DWH agus ODS (tha Vertica agus Clickhouse againn).
  • cho adhartach cron, a thòisicheas pròiseasan daingneachadh dàta air an ODS, agus cuideachd a 'cumail sùil air an cumail suas.

Gu ruige o chionn ghoirid, bha na feumalachdan againn air an còmhdach le aon fhrithealaiche beag le 32 cores agus 50 GB de RAM. Ann an Airflow, tha seo ag obair:

  • более 200 dag (gu dearbh sruthan-obrach, anns an do lìon sinn gnìomhan),
  • anns gach aon gu cuibheasach 70 obair,
  • tha am maitheas seo a 'tòiseachadh (cuideachd gu cuibheasach) uair san uair.

Agus mu mar a leudaich sinn, sgrìobhaidh mi gu h-ìosal, ach a-nis mìnichidh sinn an duilgheadas über a dh’ fhuasglas sinn:

Tha trì frithealaichean SQL tùsail ann, gach fear le stòran-dàta 50 - eisimpleirean de aon phròiseact, fa leth, tha an aon structar aca (cha mhòr anns a h-uile àite, mua-ha-ha), a tha a’ ciallachadh gu bheil clàr Òrdughan aig gach fear (gu fortanach, clàr le sin faodar ainm a phutadh a-steach do ghnìomhachas sam bith). Bidh sinn a’ toirt an dàta le bhith a’ cur raointean seirbheis ris (frithealaiche stòr, stòr-dàta stòr, ID gnìomh ETL) agus gan tilgeil a-steach gu naive, can, Vertica.

Leamamaid!

Am prìomh phàirt, practaigeach (agus beagan teòiridheach)

Carson a tha sinne (agus thusa)

Nuair a bha na craobhan mòr agus bha mi sìmplidh SQL-schik ann an aon reic Ruiseanach, rinn sinn sgam air pròiseasan ETL aka sruthan dàta a’ cleachdadh dà inneal a bha rim faighinn dhuinn:

  • Ionad cumhachd Informatica - siostam air leth sgaoileadh, air leth cinneasach, le bathar-cruaidh fhèin, dreach fhèin. Chleachd mi Dia a’ toirmeasg 1% de na comasan aige. Carson? Uill, an toiseach, chuir an eadar-aghaidh seo, am badeigin bho na 380n, cuideam inntinn oirnn. San dàrna h-àite, tha an contraption seo air a dhealbhadh airson pròiseasan air leth sùbailte, ath-chleachdadh phàirtean feargach agus cleasan iomairt eile a tha fìor chudromach. Mu dheidhinn gu bheil e a’ cosg, mar sgiath an Airbus AXNUMX / bliadhna, cha bhith sinn ag ràdh dad.

    Thoir an aire, faodaidh dealbh-sgrìn beagan a ghoirteachadh do dhaoine fo 30

    Apache Airflow: A 'dèanamh ETL nas fhasa

  • SQL Server Integration Server - chleachd sinn a’ chompanach seo nar sruthan taobh a-staigh a’ phròiseict. Uill, gu dearbh: tha sinn mu thràth a’ cleachdadh SQL Server, agus bhiodh e dòigh air choireigin mì-reusanta gun na h-innealan ETL aca a chleachdadh. Tha a h-uile dad math: tha an dà chuid an eadar-aghaidh brèagha, agus tha an adhartas ag aithris ... Ach chan e seo as coireach gu bheil sinn dèidheil air bathar-bog, o, chan ann airson seo. Tionndadh e dtsx (is e sin XML le nodan air an gluasad air sàbhaladh) is urrainn dhuinn, ach dè a’ phuing a th’ ann? Dè mu dheidhinn pasgan gnìomh a dhèanamh a tharraingeas ceudan de chlàran bho aon fhrithealaiche gu frithealaiche eile? Seadh, dè ceud, tuitidh do mheur-chlàr à fichead pìos, a’ briogadh air putan na luchaige. Ach tha e gu cinnteach a’ coimhead nas fhasanta:

    Apache Airflow: A 'dèanamh ETL nas fhasa

Bha sinn gu cinnteach a’ coimhead airson dòighean a-mach. Cùis eadhon cha mhòr thàinig mi gu gineadair pacaid SSIS fèin-sgrìobhte ...

…agus an uairsin lorg obair ùr mi. Agus thug Apache Airflow thairis mi air.

Nuair a fhuair mi a-mach gur e còd Python sìmplidh a th’ ann an tuairisgeulan pròiseas ETL, cha do rinn mi dannsa airson toileachas. Seo mar a bha sruthan dàta air an tionndadh agus air an eadar-dhealachadh, agus thàinig dòrtadh bùird le aon structar bho cheudan de stòran-dàta gu aon targaid gu bhith na chùis de chòd Python ann an aon gu leth no dhà de scrionaichean 13 ″.

A 'cruinneachadh na buidhne

Nach cuir sinn air dòigh sgoil-àraich gu tur, agus na bi a’ bruidhinn mu dheidhinn rudan gu tur follaiseach an seo, leithid stàladh Airflow, an stòr-dàta a thagh thu, Soilleir agus cùisean eile a tha air am mìneachadh anns na docaichean.

Gus an urrainn dhuinn deuchainnean a thòiseachadh sa bhad, rinn mi sgeidse docker-compose.yml anns a bheil:

  • Dìreach togaidh sinn suas Airflow: Clàr-ama, frithealaiche lìn. Bidh Flower cuideachd a’ snìomh an sin gus sùil a chumail air gnìomhan soilire (seach gu bheil e air a phutadh a-steach mu thràth apache/airflow:1.10.10-python3.7, ach chan eil dragh againn)
  • PostgreSQL, anns am bi Airflow a’ sgrìobhadh am fiosrachadh seirbheis aige (dàta clàraiche, staitistig cur an gnìomh, msaa), agus comharraichidh Celery gnìomhan crìochnaichte;
  • Redis, a bhios mar neach-malairt gnìomhan airson Celery;
  • Neach-obrach soilire, a bhios an sàs ann an coileanadh dìreach gnìomhan.
  • Gu pasgan ./dags cuiridh sinn na faidhlichean againn leis an tuairisgeul air dags. Thèid an togail air an itealan, agus mar sin chan fheumar an stac gu lèir a cheangal às deidh gach sreothartaich.

Ann an cuid de dh'àiteachan, chan eil an còd anns na h-eisimpleirean air a shealltainn gu tur (gus nach cuir thu dragh air an teacsa), ach an àiteigin tha e air atharrachadh sa phròiseas. Gheibhear eisimpleirean de chòd obrachaidh iomlan anns an stòr https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notaichean:

  • Ann an co-chruinneachadh an sgrìobhaidh, bha mi gu mòr an urra ris an ìomhaigh ainmeil puclach/ docker-sruth-adhair - bi cinnteach gun toir thu sùil air. Is dòcha nach eil feum agad air dad sam bith eile nad bheatha.
  • Tha a h-uile suidheachadh Airflow ri fhaighinn chan ann a-mhàin troimhe airflow.cfg, ach cuideachd tro chaochladairean àrainneachd (taing don luchd-leasachaidh), a ghabh mi gu mì-fhortanach brath air.
  • Gu nàdarra, chan eil e deiseil airson cinneasachadh: cha do chuir mi buillean cridhe air soithichean a dh’aona ghnothach, cha do chuir mi dragh air tèarainteachd. Ach rinn mi an ìre as lugha a bha iomchaidh airson ar luchd-deuchainn.
  • Thoir fa-near:
    • Feumaidh am pasgan dag a bhith ruigsinneach don chlàr-ama agus don luchd-obrach.
    • Tha an aon rud a 'buntainn ris a h-uile treas-phàrtaidh leabharlannan - feumaidh iad uile a bhith air a stàladh air innealan le clàr-ama agus luchd-obrach.

Uill, a-nis tha e sìmplidh:

$ docker-compose up --scale worker=3

Às deidh a h-uile càil èirigh, faodaidh tu coimhead air an eadar-aghaidh lìn:

Bun-bheachdan bunaiteach

Mura do thuig thu dad anns na “dagaichean” sin uile, seo faclair goirid:

  • Prògramadair - an uncail as cudromaiche ann an Airflow, a bhios a 'riaghladh gu bheil innealan-fuadain ag obair gu cruaidh, agus chan e duine: a' cumail sùil air a 'chlàr-ama, ag ùrachadh bhiodagan, a' cur air bhog gnìomhan.

    San fharsaingeachd, ann an dreachan nas sine, bha duilgheadasan aige le cuimhne (chan e, chan e amnesia, ach aoidion) agus dh’ fhan am paramadair dìleab eadhon anns na configs run_duration - an eadar-ama ath-thòiseachadh aige. Ach a-nis tha a h-uile dad gu math.

  • DAG (aka “dag”) - “graf acyclic stiùirichte”, ach innsidh mìneachadh mar seo glè bheag de dhaoine, ach gu dearbh tha e na ghobhar airson gnìomhan a tha ag eadar-obrachadh le chèile (faic gu h-ìosal) no analogue de phasgan ann an SSIS agus Sruth-obrach ann an Informatica .

    A bharrachd air dagaichean, 's dòcha gum bi fo-bhuidhnean ann fhathast, ach is coltaiche nach fhaigh sinn thuca.

  • DAG Ruith - daga tòiseachaidh, a tha air a shònrachadh fhèin execution_date. Faodaidh dagrans den aon bhiod obrachadh aig an aon àm (ma tha thu air do ghnìomhan a dhèanamh neo-chomasach, gu dearbh).
  • Operator nam pìosan còd le uallach airson gnìomh sònraichte a choileanadh. Tha trì seòrsaichean de ghnìomhaichean ann:
    • gnìomhamar am fear as fheàrr leinn PythonOperator, as urrainn còd Python (dligheach) sam bith a chuir an gnìomh;
    • gluasad, a bhios a’ giùlan dàta bho àite gu àite, can, MsSqlToHiveTransfer;
    • mothachadh air an làimh eile, leigidh e leat freagairt no slaodadh sìos air coileanadh a’ bhiod a bharrachd gus an tachair tachartas. HttpSensor is urrainn dhaibh an ceann-uidhe ainmichte a tharraing, agus nuair a tha am freagairt a tha thu ag iarraidh a’ feitheamh, tòisich air a’ ghluasad GoogleCloudStorageToS3Operator. Bidh inntinn fiosrachail a’ faighneachd: “carson? Às deidh na h-uile, faodaidh tu ath-aithris a dhèanamh ceart anns a ’ghnìomhaiche!” Agus an uairsin, gus nach cuir thu bacadh air an raon de ghnìomhan le gnìomhaichean crochte. Bidh an sensor a’ tòiseachadh, a’ sgrùdadh agus a’ bàsachadh ron ath oidhirp.
  • Gnìomh - tha luchd-obrachaidh dearbhte, ge bith dè an seòrsa, agus a tha ceangailte ris a’ bhiod air an àrdachadh gu ìre na h-obrach.
  • eisimpleir de ghnìomhan - nuair a cho-dhùin an dealbhaiche coitcheann gu robh an t-àm ann gnìomhan a chuir gu cath air luchd-ciùil (dìreach san spot, ma chleachdas sinn LocalExecutor no gu nód iomallach ann an cùis CeleryExecutor), bidh e a’ sònrachadh co-theacs dhaibh (ie, seata de chaochladairean - paramadairean buileachaidh), a’ leudachadh teamplaidean àithne no ceist, agus gan cruinneachadh.

Bidh sinn a’ cruthachadh ghnìomhan

An toiseach, bheir sinn cunntas air sgeama coitcheann ar doug, agus an uairsin bidh sinn a ’dàibheadh ​​​​a-steach don fhiosrachadh barrachd is barrachd, oir bidh sinn a’ cleachdadh cuid de fhuasglaidhean nach eil cho beag.

Mar sin, anns an fhoirm as sìmplidhe, seallaidh a leithid de bhiodag mar seo:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Nach dèan sinn a-mach e:

  • An toiseach, bidh sinn a’ toirt a-steach na libs riatanach agus rud eile;
  • sql_server_ds A bheil List[namedtuple[str, str]] le ainmean nan ceanglaichean bho Airflow Connections agus na stòran-dàta às an toir sinn ar truinnsear;
  • dag — foillseachadh ar biodag, a dh' fheumas a bhi a stigh globals(), air neo cha lorgar Airflow e. Feumaidh Doug cuideachd a ràdh:
    • ciod is ainm dha orders - nochdaidh an t-ainm seo an uairsin anns an eadar-aghaidh lìn,
    • gun oibrich e o mheadhon oidhche air an ochdamh la de'n Iuchar,
    • agus bu chòir dha ruith, timcheall air a h-uile 6 uairean (airson balaich duilich an seo an àite timedelta() ceadaichte cron- loidhne 0 0 0/6 ? * * *, airson an fheadhainn nach eil cho fionnar - abairt mar @daily);
  • workflow() nì am prìomh obair, ach chan ann a-nis. Airson a-nis, bidh sinn dìreach a’ tilgeil ar co-theacsa dhan loga.
  • Agus a-nis an draoidheachd shìmplidh airson gnìomhan a chruthachadh:
    • tha sinn a' ruith troimh ar tobraichean ;
    • toiseachadh PythonOperator, a chuireas an gnìomh ar dummy workflow(). Na dì-chuimhnich ainm sònraichte (taobh a-staigh an daga) den obair a shònrachadh agus an daga fhèin a cheangal. Bratach provide_context an uair sin, dòirtidh sinn argamaidean a bharrachd a-steach don ghnìomh, a chruinnicheas sinn gu faiceallach a’ cleachdadh **context.

Airson a-nis, tha sin uile. Na fhuair sinn:

  • dag ùr san eadar-aghaidh lìn,
  • ceud gu leth gnìomh a thèid a chuir gu bàs aig an aon àm (ma cheadaicheas na roghainnean Airflow, Celery agus comas frithealaiche).

Uill, cha mhòr nach d’ fhuair.

Apache Airflow: A 'dèanamh ETL nas fhasa
Cò a stàlaicheas na h-eisimeileachd?

Gus an rud gu lèir seo a dhèanamh nas sìmplidhe, chuir mi a-steach e docker-compose.yml giollachd requirements.txt air na h-uile nithibh.

A-nis tha e air falbh:

Apache Airflow: A 'dèanamh ETL nas fhasa

Tha ceàrnagan glasa nan eisimpleirean gnìomh air an giullachd leis a’ chlàr-ama.

Bidh sinn a 'feitheamh beagan, bidh an luchd-obrach a' toirt seachad na gnìomhan:

Apache Airflow: A 'dèanamh ETL nas fhasa

Tha an fheadhainn uaine, gu dearbh, air an obair a chrìochnachadh gu soirbheachail. Chan eil dearg gu math soirbheachail.

Co-dhiù, chan eil pasgan sam bith air ar prod ./dags, chan eil sioncronadh eadar innealan - tha na dagaichean uile nan laighe a-steach git air ar Gitlab, agus bidh Gitlab CI a’ sgaoileadh ùrachaidhean gu innealan nuair a thig iad còmhla master.

Beagan mu dheidhinn Flower

Fhad ‘s a tha an luchd-obrach a’ bualadh air na pacifiers againn, cuimhnichidh sinn inneal eile a sheallas rudeigin dhuinn - Flower.

A’ chiad duilleag le fiosrachadh geàrr-chunntas air nodan luchd-obrach:

Apache Airflow: A 'dèanamh ETL nas fhasa

An duilleag as dian le gnìomhan a chaidh gu obair:

Apache Airflow: A 'dèanamh ETL nas fhasa

An duilleag as dorra le inbhe ar broker:

Apache Airflow: A 'dèanamh ETL nas fhasa

Tha an duilleag as soilleire le grafaichean inbhe gnìomh agus an ùine cur gu bàs:

Apache Airflow: A 'dèanamh ETL nas fhasa

Bidh sinn a 'luchdachadh an fheadhainn nach eil air an luchdachadh

Mar sin, gu bheil na gnìomhan gu lèir air obrachadh a-mach, faodaidh tu an fheadhainn leòinte a thoirt air falbh.

Apache Airflow: A 'dèanamh ETL nas fhasa

Agus bha mòran leònte - airson adhbhar air choireigin. A thaobh cleachdadh ceart de Airflow, tha na fìor cheàrnagan sin a’ nochdadh nach do ràinig an dàta gu cinnteach.

Feumaidh tu coimhead air an loga agus ath-thòiseachadh na h-eisimpleirean gnìomh a tha air tuiteam.

Le bhith a’ cliogadh air ceàrnag sam bith, chì sinn na gnìomhan a tha rim faighinn dhuinn:

Apache Airflow: A 'dèanamh ETL nas fhasa

Faodaidh tu an fheadhainn a thuit a ghabhail agus a dhèanamh Glan. Is e sin, bidh sinn a’ dìochuimhneachadh gu bheil rudeigin air fàiligeadh an sin, agus thèid an aon ghnìomh eisimpleir chun chlàr-ama.

Apache Airflow: A 'dèanamh ETL nas fhasa

Tha e soilleir nach eil e gu math daonnach a bhith a’ dèanamh seo leis an luchag leis na ceàrnagan dearga gu lèir - chan e seo a tha sinn a’ dùileachadh bho Airflow. Gu nàdarra, tha armachd lèir-sgrios againn: Browse/Task Instances

Apache Airflow: A 'dèanamh ETL nas fhasa

Taghamaid a h-uile càil aig an aon àm agus ath-shuidhich sinn gu neoni, cliog air an rud cheart:

Apache Airflow: A 'dèanamh ETL nas fhasa

Às deidh glanadh, tha na tacsaidhean againn a ’coimhead mar seo (tha iad mu thràth a’ feitheamh ris an neach-clàraidh an clàradh):

Apache Airflow: A 'dèanamh ETL nas fhasa

Ceanglaichean, dubhan agus caochladairean eile

Tha an t-àm ann coimhead air an ath DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

A bheil a h-uile duine a-riamh air ùrachadh aithisg a dhèanamh? Seo i a-rithist: tha liosta de thobraichean bho far am faighear an dàta; tha liosta ann far an cuir thu; na dì-chuimhnich urram a thoirt nuair a thachair no a bhris a h-uile càil (uill, chan eil seo mu ar deidhinn, chan eil).

Rachamaid tron ​​​​fhaidhle a-rithist agus coimhead air na rudan ùra doilleir:

  • from commons.operators import TelegramBotSendMessage - chan eil dad a’ cur stad oirnn bho bhith a’ dèanamh ar gnìomhaichean fhèin, rud a ghabh sinn brath air le bhith a’ dèanamh pasgan beag airson teachdaireachdan a chuir gu Unblocked. (Bruidhnidh sinn barrachd mun ghnìomhaiche seo gu h-ìosal);
  • default_args={} - faodaidh dag na h-aon argamaidean a sgaoileadh air a h-uile gnìomhaiche;
  • to='{{ var.value.all_the_kings_men }}' - achadh to cha bhith sinn air còd cruaidh, ach air a ghineadh gu dinamach a’ cleachdadh Jinja agus caochladair le liosta de phuist-d, a chuir mi a-steach gu faiceallach Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - suidheachadh airson an gnìomhaiche a thòiseachadh. Anns a 'chùis againn, cha tèid an litir gu na ceannardan a-mhàin ma tha a h-uile eisimeileachd air obrachadh a-mach gu soirbheachail;
  • tg_bot_conn_id='tg_main' - argamaidean conn_id gabh ri IDan ceangail a chruthaicheas sinn ann Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - cha tèid teachdaireachdan ann an Telegram air falbh ach ma tha gnìomhan air tuiteam;
  • task_concurrency=1 - bidh sinn a’ toirmeasg grunn eisimpleirean gnìomh de aon ghnìomh a chuir air bhog aig an aon àm. Rud eile, gheibh sinn grunn chuir air bhog aig an aon àm VerticaOperator (a 'coimhead air aon bhòrd);
  • report_update >> [email, tg] - uile VerticaOperator tighinn còmhla ann a bhith a’ cur litrichean is teachdaireachdan, mar seo:
    Apache Airflow: A 'dèanamh ETL nas fhasa

    Ach leis gu bheil suidheachaidhean tòiseachaidh eadar-dhealaichte aig gnìomhaichean fios, chan obraich ach aon. Ann an Tree View, tha a h-uile dad a’ coimhead beagan nas lugha de lèirsinn:
    Apache Airflow: A 'dèanamh ETL nas fhasa

Canaidh mi beagan fhaclan mu dheidhinn macros agus an caraidean - caochladairean.

Tha Macros nan luchd-àite Jinja a dh’ fhaodas diofar fhiosrachadh feumail a chuir an àite argamaidean gnìomhaiche. Mar eisimpleir, mar seo:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} leudaichidh e gu susbaint an caochladair co-theacsa execution_date anns a ’chruth YYYY-MM-DD: 2020-07-14. Is e am pàirt as fheàrr gu bheil caochladairean co-theacsa air an ceangal ri eisimpleir gnìomh sònraichte (ceàrnag ann an Tree View), agus nuair a thèid ath-thòiseachadh, leudaichidh an luchd-àite gu na h-aon luachan.

Faodar na luachan ainmichte fhaicinn a’ cleachdadh a’ phutan Rendered air gach eisimpleir gnìomh. Seo mar a tha an obair le bhith a’ cur litir:

Apache Airflow: A 'dèanamh ETL nas fhasa

Agus mar sin aig an obair le bhith a’ cur teachdaireachd:

Apache Airflow: A 'dèanamh ETL nas fhasa

Tha liosta iomlan de macros togte airson an dreach as ùire a tha ri fhaighinn ri fhaighinn an seo: iomradh macros

A bharrachd air an sin, le cuideachadh bho plugins, is urrainn dhuinn na macros againn fhèin ainmeachadh, ach sin sgeulachd eile.

A bharrachd air na rudan ro-mhìnichte, is urrainn dhuinn luachan ar caochladairean a chuir an àite (chleachd mi seo sa chòd gu h-àrd mu thràth). Cruthaichidh sinn a-steach Admin/Variables rud no dhà:

Apache Airflow: A 'dèanamh ETL nas fhasa

A h-uile rud as urrainn dhut a chleachdadh:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Faodaidh an luach a bhith na sgalar, no faodaidh e a bhith JSON cuideachd. A thaobh JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

dìreach cleachd an t-slighe chun an iuchair a tha thu ag iarraidh: {{ var.json.bot_config.bot.token }}.

Canaidh mi aon fhacal gu litireil agus seallaidh mi aon dealbh-sgrìn mu dheidhinn ceanglaichean. Tha a h-uile dad bunaiteach an seo: air an duilleag Admin/Connections bidh sinn a’ cruthachadh ceangal, a’ cur ar logins / faclan-faire agus paramadairean nas sònraichte an sin. Mar seo:

Apache Airflow: A 'dèanamh ETL nas fhasa

Faodar faclan-faire a chrioptachadh (nas mionaidiche na an àbhaist), no faodaidh tu an seòrsa ceangail fhàgail a-mach (mar a rinn mi airson tg_main). ainm.

Faodaidh tu cuideachd grunn cheanglaichean a dhèanamh leis an aon ainm: sa chùis seo, am modh BaseHook.get_connection(), a gheibh ceanglaichean dhuinn le ainm, bheir air thuaiream bho ghrunn ainmean (bhiodh e na bu loidsigeach Round Robin a dhèanamh, ach fàgamaid e air cogais luchd-leasachaidh Airflow).

Tha caochlaidhean agus Ceanglaichean gu cinnteach nan innealan fionnar, ach tha e cudromach gun a bhith a 'call an cothromachadh: dè na pàirtean de na sruthan agad a bhios tu a' stòradh sa chòd fhèin, agus dè na pàirtean a bheir thu dha Airflow airson a stòradh. Air an aon làimh, faodaidh e a bhith goireasach an luach atharrachadh gu sgiobalta, mar eisimpleir, bogsa puist, tron ​​​​UI. Air an làimh eile, tha seo fhathast na thilleadh gu cliog na luchaige, às an robh sinn (mi) airson faighinn cuidhteas.

Is e obair le ceanglaichean aon de na gnìomhan dubhan. San fharsaingeachd, tha dubhan Airflow nam puingean airson a cheangal ri seirbheisean treas-phàrtaidh agus leabharlannan. m.e., JiraHook fosglaidh neach-dèiligidh dhuinn gus eadar-obrachadh le Jira (faodaidh tu gnìomhan a ghluasad air ais is air adhart), agus le cuideachadh bho SambaHook faodaidh tu faidhle ionadail a phutadh gu smb-puing.

A 'parsadh a' ghnìomhaiche àbhaisteach

Agus thàinig sinn faisg air coimhead air mar a tha e air a dhèanamh TelegramBotSendMessage

còd a ' commons/operators.py leis an fhìor ghnìomhaiche:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

An seo, mar a h-uile càil eile ann an Airflow, tha a h-uile dad gu math sìmplidh:

  • Air a shealbhachadh bho BaseOperator, a bhios a’ buileachadh grunn rudan a tha sònraichte do Airflow (seall air do chur-seachad)
  • Raointean ainmichte template_fields, anns am bi Jinja a’ coimhead airson macros airson a phròiseasadh.
  • Chuir e air dòigh na h-argamaidean ceart airson __init__(), suidhich na roghainnean bunaiteach far a bheil sin riatanach.
  • Cha do dhìochuimhnich sinn mu thùs an sinnsear nas motha.
  • Dh’ fhosgail an dubhan co-fhreagarrach TelegramBotHookfhuair e nì neach-dèiligidh bhuaithe.
  • Modh ath-mhìnichte (ath-mhìnichte). BaseOperator.execute(), a bhios Airfow a’ tionndadh nuair a thig an t-àm airson a’ ghnìomhaiche a chuir air bhog - annta cuiridh sinn am prìomh ghnìomh an gnìomh, a’ dìochuimhneachadh logadh a-steach. (Bidh sinn a’ logadh a-steach, co-dhiù, dìreach a-steach stdout и stderr - Bidh sruth-adhair a’ toirt a-steach a h-uile càil, ga phasgadh gu breagha, ga lobhadh far a bheil sin riatanach.)

Chì sinn na th’ againn commons/hooks.py. A’ chiad phàirt den fhaidhle, leis an dubhan fhèin:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Chan eil fios agam eadhon dè a mhìnicheas mi an seo, bheir mi fa-near na puingean cudromach:

  • Bidh sinn a 'sealbhachadh, a' smaoineachadh mu na h-argamaidean - sa mhòr-chuid de chùisean bidh e mar aon: conn_id;
  • A’ dol thairis air modhan àbhaisteach: chuir mi bacadh orm fhìn get_conn(), anns am faigh mi na paramadairean ceangail a rèir ainm agus dìreach faigh an earrann extra (is e seo raon JSON), anns an do chuir mi (a rèir an stiùiridh agam fhìn!) an tòcan bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Bidh mi a’ cruthachadh eisimpleir de ar TelegramBot, a 'toirt comharradh sònraichte dha.

Sin e. Gheibh thu neach-dèiligidh bho dubhan a 'cleachdadh TelegramBotHook().clent no TelegramBotHook().get_conn().

Agus an dàrna pàirt den fhaidhle, anns am bi mi a’ dèanamh microwrapper airson an Telegram REST API, gus nach slaod mi an aon rud python-telegram-bot airson aon dòigh sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Is e an dòigh cheart a h-uile càil a chur ris: TelegramBotSendMessage, TelegramBotHook, TelegramBot - anns a’ plugan, cuir a-steach stòr poblach, agus thoir gu Open Source e.

Fhad ‘s a bha sinn a’ sgrùdadh seo gu lèir, chaidh aig na h-ùrachaidhean aithisg againn air fàiligeadh gu soirbheachail agus chuir iad teachdaireachd mearachd thugam san t-sianal. Tha mi a’ dol a choimhead feuch a bheil e ceàrr...

Apache Airflow: A 'dèanamh ETL nas fhasa
Bhris rudeigin nar cù! Nach e sin a bha sinn an dùil? Dìreach!

A bheil thu a’ dol a dhòrtadh?

A bheil thu a’ faireachdainn gun do chaill mi rudeigin? Tha e coltach gun do gheall e dàta a ghluasad bho SQL Server gu Vertica, agus an uairsin ghabh e e agus ghluais e far a 'chuspair, an scoundrel!

Bha an uamhas seo a dh’aona ghnothach, cha robh agam ach beagan briathrachais a mhìneachadh dhut. A-nis faodaidh tu a dhol nas fhaide.

B’ e seo ar plana:

  1. Dèan daga
  2. Cruthaich gnìomhan
  3. Faic cho breagha sa tha a h-uile dad
  4. Sònraich àireamhan seisean ri lìonadh
  5. Faigh dàta bho SQL Server
  6. Cuir dàta ann an Vertica
  7. Cruinnich staitistig

Mar sin, airson seo a thoirt gu buil, chuir mi beagan ris an fheadhainn againn docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

An sin tha sinn a 'togail:

  • Vertica mar aoigh dwh leis na roghainnean as bunaitiche,
  • trì eisimpleirean de SQL Server,
  • bidh sinn a’ lìonadh na stòran-dàta anns an fhear mu dheireadh le beagan dàta (gun fhios nach coimhead thu a-steach mssql_init.py!)

Bidh sinn a’ cur air bhog a h-uile rud math le cuideachadh bho àithne beagan nas iom-fhillte na an turas mu dheireadh:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Na chruthaich an randomizer mìorbhuileach againn, faodaidh tu an rud a chleachdadh Data Profiling/Ad Hoc Query:

Apache Airflow: A 'dèanamh ETL nas fhasa
Is e am prìomh rud gun a bhith ga shealltainn do luchd-anailis

mion-sgrùdadh air Seiseanan ETL Cha dèan, tha a h-uile dad beag an sin: bidh sinn a’ dèanamh bunait, tha soidhne ann, bidh sinn a’ cuairteachadh a h-uile càil le manaidsear co-theacsa, agus a-nis bidh sinn a’ dèanamh seo:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

seisean.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tha an t-àm air tighinn cruinnich ar dàta o'n ceud gu leth clàr. Feuch an dèan sinn seo le cuideachadh bho loidhnichean fìor mhì-mhisneachail:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Le cuideachadh bho dubhan gheibh sinn bho Airflow pymssql- ceangal
  2. Leig leinn cuingealachadh ann an cruth ceann-latha a chuir a-steach don iarrtas - thèid a thilgeil a-steach don ghnìomh leis an einnsean teamplaid.
  3. A 'biathadh ar n-iarrtas pandascò gheibh sinn DataFrame - bidh e feumail dhuinn san àm ri teachd.

Tha mi a’ cleachdadh ionadachadh {dt} an àite paramadair iarrtas %s chan ann a chionn 's gur e Pinocchio olc a th' annam, ach air sgàth pandas chan urrainn a làimhseachadh pymssql agus sleamhnaich am fear mu dheireadh params: Listged a tha e dha-rìribh ag iarraidh tuple.
Thoir fa-near cuideachd gu bheil an leasaiche pymssql cho-dhùin e gun a bhith a’ toirt taic dha tuilleadh, agus tha an t-àm ann gluasad a-mach pyodbc.

Chì sinn dè a lìon Airflow argamaidean ar gnìomhan le:

Apache Airflow: A 'dèanamh ETL nas fhasa

Mura h-eil dàta ann, chan eil adhbhar ann leantainn air adhart. Ach tha e neònach cuideachd beachdachadh air an lìonadh soirbheachail. Ach chan e mearachd a tha seo. A-ah-ah, dè a nì thu?! Agus seo na tha:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ag innse do Airflow nach eil mearachdan ann, ach gun leum sinn air a’ ghnìomh. Cha bhi ceàrnag uaine no dearg air an eadar-aghaidh, ach pinc.

Tilgeamaid an dàta againn ioma colbhan:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Is e sin:

  • An stòr-dàta às an do ghabh sinn na h-òrdughan,
  • ID den t-seisean tuiltean againn (bidh e eadar-dhealaichte airson gach obair),
  • Hash bhon stòr agus ID òrduigh - gus am bi ID òrdugh sònraichte againn anns an stòr-dàta mu dheireadh (far a bheil a h-uile càil air a dhòrtadh ann an aon bhòrd).

Tha an dàrna ceum fhathast: dòrtadh a h-uile càil a-steach gu Vertica. Agus, gu neònach gu leòr, is e CSV aon de na dòighean as iongantaiche agus as èifeachdaiche seo a dhèanamh!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Tha sinn a’ dèanamh inneal-glacaidh sònraichte StringIO.
  2. pandas cuiridh sinn gu caoimhneil DataFrame anns an riochd CSV-loidhnichean.
  3. Fosglaidh sinn ceangal ris an Vertica as fheàrr leinn le dubhan.
  4. Agus a-nis le cuideachadh copy() cuir an dàta againn gu dìreach gu Vertika!

Bheir sinn bhon draibhear cia mheud loidhne a chaidh a lìonadh, agus innis do mhanaidsear an t-seisein gu bheil a h-uile dad ceart gu leòr:

session.loaded_rows = cursor.rowcount
session.successful = True

Sin e.

Air an reic, bidh sinn a 'cruthachadh a' phlàta targaid le làimh. An seo leig mi leam inneal beag:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Tha mi a 'cleachdadh VerticaOperator() Bidh mi a’ cruthachadh sgeama stòr-dàta agus clàr (mura h-eil iad ann mar-thà, gu dearbh). Is e am prìomh rud na eisimeileachd a chuir air dòigh gu ceart:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

A 'togail suas

— Uill, — ars' an luch bheag, — nach 'eil, a nis
A bheil thu cinnteach gur mise am beathach as uamhasach sa choille?

Julia Donaldson, An Gruffalo

Tha mi a’ smaoineachadh nam biodh farpais aig mo cho-obraichean agus mise: cò a chruthaicheas agus a chuireas air bhog pròiseas ETL bhon fhìor thoiseach: iadsan leis an SSIS agus an luchag agus mise le Airflow ... Agus an uairsin bhiodh sinn cuideachd a’ dèanamh coimeas eadar cho furasta agus a tha e cumail suas ... Wow, tha mi a’ smaoineachadh gun aontaich thu gun dèan mi a’ chùis orra air a h-uile taobh!

Ma tha e beagan nas cudromaiche, rinn Apache Airflow - le bhith a’ toirt cunntas air pròiseasan ann an cruth còd prògram - an obair agam mòran nas comhfhurtail agus nas tlachdmhoire.

Tha an leudachadh gun chrìoch, an dà chuid a thaobh plug-ins agus ro-shealladh air scalability, a’ toirt cothrom dhut sruth-adhair a chleachdadh ann an cha mhòr raon sam bith: eadhon anns a ’chearcall iomlan de bhith a’ tional, ag ullachadh agus a ’giullachd dàta, eadhon ann a bhith a’ cur air bhog rocaidean (gu Mars, de cùrsa).

Pàirt dheireannach, iomradh agus fiosrachadh

An ràcan a chruinnich sinn dhut

  • start_date. Tha, is e meme ionadail a tha seo mu thràth. Tro phrìomh argamaid Doug start_date uile seachad. Ann an ùine ghoirid, ma shònraicheas tu a-steach start_date ceann-latha làithreach, agus schedule_interval - aon latha, an uairsin tòisichidh DAG a-màireach gun a bhith nas tràithe.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Agus chan eil barrachd dhuilgheadasan ann.

    Tha mearachd runtime eile co-cheangailte ris: Task is missing the start_date parameter, a tha mar as trice a 'nochdadh gun do dhìochuimhnich thu ceangal a dhèanamh ris a' ghnìomhaiche biodag.

  • A h-uile còmhla ris air aon inneal. Tha, agus bunaitean (Airflow fhèin agus an còmhdach againn), agus frithealaiche lìn, agus clàr-ama, agus luchd-obrach. Agus dh'obraich e eadhon. Ach thar ùine, dh'fhàs an àireamh de ghnìomhan airson seirbheisean, agus nuair a thòisich PostgreSQL a 'freagairt ris a' chlàr-amais ann an 20 s an àite 5 ms, thug sinn air falbh e agus thug sinn air falbh e.
  • Gnìomhaiche Ionadail. Tha, tha sinn fhathast nar suidhe air, agus tha sinn mu thràth air tighinn gu oir an dubh-aigein. Tha LocalExecutor air a bhith gu leòr dhuinn gu ruige seo, ach a-nis tha an t-àm ann leudachadh le co-dhiù aon neach-obrach, agus feumaidh sinn a bhith ag obair gu cruaidh gus gluasad gu CeleryExecutor. Agus leis gun urrainn dhut obrachadh leis air aon inneal, chan eil dad a’ cur stad ort bho bhith a’ cleachdadh Celery eadhon air frithealaiche, nach bi “gu dearbh, a’ dol a-steach gu cinneasachadh, gu h-onarach! ”
  • Neo-chleachdadh innealan togte:
    • Connections gus teisteanasan seirbheis a stòradh,
    • SLA ag ionndrainn freagairt a thoirt do ghnìomhan nach do dh’obraich ann an tìde,
    • xcom airson iomlaid meata-dàta (thuirt mi metadàta!) eadar gnìomhan dag.
  • Mì-ghnàthachadh puist. Uill, dè as urrainn dhomh a ràdh? Chaidh rabhaidhean a chuir air dòigh airson gach ath-aithris de ghnìomhan a thuit. A-nis tha an obair agam aig Gmail > 90k puist-d bho Airflow, agus tha am post post-lìn a’ diùltadh barrachd air 100 a thogail agus a dhubhadh às aig an aon àm.

Barrachd dhuilgheadasan: Duilgheadasan sruth-adhair Apache

Barrachd innealan fèin-ghluasaid

Gus an obraich sinn eadhon nas motha le ar cinn agus chan ann le ar làmhan, tha Airflow air seo ullachadh dhuinn:

  • CÒRR API - tha inbhe Experimental aige fhathast, nach eil a 'cur bacadh air bho bhith ag obair. Leis, chan e a-mhàin gum faigh thu fiosrachadh mu bhiodagan agus gnìomhan, ach cuideachd stad / tòisich daga, cruthaich DAG Run no amar.
  • CLI - tha mòran innealan rim faighinn tron ​​​​loidhne-àithne nach eil dìreach mì-ghoireasach a chleachdadh tron ​​​​WebUI, ach a tha sa chumantas neo-làthaireach. Mar eisimpleir:
    • backfill a dhìth gus eisimpleirean de ghnìomhan ath-thòiseachadh.
      Mar eisimpleir, thàinig luchd-anailis agus thuirt iad: “Agus thusa, a chompanach, tha neòinean anns an dàta bho 1 Faoilleach gu 13! Ceartaich e, socraich e, socraich e, socraich e!” Agus tha thu nad hob mar sin:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Seirbheis bunaiteach: initdb, resetdb, upgradedb, checkdb.
    • run, a leigeas leat aon ghnìomh eisimpleir a ruith, agus eadhon sgòr a dhèanamh air a h-uile eisimeileachd. A bharrachd air an sin, faodaidh tu a ruith tro LocalExecutor, fiù 's ma tha brabhsair Celery agad.
    • A 'dèanamh gu ìre mhòr an aon rud test, a-mhàin cuideachd anns na bunaitean chan eil e a’ sgrìobhadh dad.
    • connections a 'ceadachadh ceanglaichean a chruthachadh bhon t-slige.
  • API Python - dòigh caran cruaidh air eadar-obrachadh, a thathar an dùil airson plugins, agus gun a bhith a’ snàmh ann le làmhan beaga. Ach cò a tha gus stad a chuir oirnn bho bhith a’ dol /home/airflow/dagsruith ipython agus tòisich a’ dol mun cuairt? Faodaidh tu, mar eisimpleir, às-mhalairt a h-uile ceangal leis a’ chòd a leanas:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • A’ ceangal ri stòr-dàta meata-dàta Airflow. Chan eil mi a’ moladh sgrìobhadh thuige, ach faodaidh a bhith a’ faighinn stàitean gnìomh airson grunn mheatairean sònraichte a bhith fada nas luaithe agus nas fhasa na tro aon de na APIan.

    Canaidh sinn nach eil a h-uile gnìomh againn neo-chomasach, ach uaireannan faodaidh iad tuiteam, agus tha seo àbhaisteach. Ach tha cuid de bhacaidhean mar-thà amharasach, agus bhiodh feum air sgrùdadh.

    Thoir an aire SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

iomraidhean

Agus gu dearbh, is e a’ chiad deich ceanglaichean bho sgaoileadh Google na tha anns a’ phasgan Airflow bho na comharran-leabhair agam.

Agus na ceanglaichean a chleachdar san artaigil:

Source: www.habr.com