Apache Airflow: ETL a dhéanamh níos éasca

Dia duit, is mise Dmitry Logvinenko - Innealtóir Sonraí de chuid na Roinne Analytics de ghrúpa cuideachtaí Vezet.

Inseoidh mé duit faoi uirlis iontach chun próisis ETL a fhorbairt - Apache Airflow. Ach tá Airflow chomh ilghnéitheach agus chomh ilghnéitheach sin gur chóir duit breathnú níos géire air fiú mura bhfuil baint agat le sreafaí sonraí, ach go bhfuil gá agat próisis ar bith a sheoladh go tréimhsiúil agus monatóireacht a dhéanamh ar a gcur i gcrích.

Agus tá, ní inseoidh mé ní hamháin, ach freisin taispeánfaidh mé: tá go leor cód, screenshots agus moltaí ag an gclár.

Apache Airflow: ETL a dhéanamh níos éasca
An rud a fheiceann tú de ghnáth nuair a dhéanann tú google an focal Airflow / Wikimedia Commons

Tábla na nÁbhar

Réamhrá

Tá Apache Airflow díreach cosúil le Django:

  • scríofa i python
  • tá painéal riaracháin iontach ann,
  • inmhéadaithe ar feadh tréimhse éiginnte

- ach níos fearr, agus rinneadh é chun críocha go hiomlán difriúil, eadhon (mar atá sé scríofa roimh an kata):

  • tascanna a rith agus monatóireacht a dhéanamh ar líon neamhtheoranta meaisíní (mar a cheadóidh go leor Soilire / Kubernetes agus do choinsiasa duit)
  • le giniúint sreabhadh oibre dinimiciúil ó chód Python an-éasca a scríobh agus a thuiscint
  • agus an cumas aon bhunachair shonraí agus APIanna a nascadh lena chéile trí úsáid a bhaint as comhpháirteanna réamhdhéanta agus forlíontáin de dhéantús baile (rud atá thar a bheith simplí).

Úsáidimid Apache Airflow mar seo:

  • bailímid sonraí ó fhoinsí éagsúla (go leor cásanna SQL Server agus PostgreSQL, APIanna éagsúla le méadracht feidhmchláir, fiú 1C) in DWH agus ODS (tá Vertica agus Clickhouse againn).
  • conas chun cinn cron, a chuireann tús leis na próisis chomhdhlúthaithe sonraí ar ODS, agus a dhéanann monatóireacht ar a gcothabháil freisin.

Go dtí le déanaí, bhí ár gcuid riachtanas clúdaithe ag freastalaí beag amháin le 32 cores agus 50 GB RAM. In Airflow, oibríonn sé seo:

  • níos mó 200 dag (sreafaí oibre i ndáiríre, inar líonta muid tascanna),
  • i ngach ar an meán 70 tasc,
  • tosaíonn an mhaitheas seo (ar an meán freisin) uair an chloig.

Agus faoin gcaoi ar mhéadaigh muid, scríobhfaidh mé thíos, ach anois déanaimis sainmhíniú ar an bhfadhb über a réiteoimid:

Tá trí Fhreastalaí SQL bunaidh, gach ceann acu le 50 bunachar sonraí - cásanna de thionscadal amháin, faoi seach, tá an struchtúr céanna acu (beagnach i ngach áit, mua-ha-ha), rud a chiallaíonn go bhfuil tábla Orduithe ag gach ceann acu (go fortunately, tábla leis sin is féidir ainm a bhrú isteach in aon ghnó). Glacaimid na sonraí trí réimsí seirbhíse (freastalaí foinse, bunachar sonraí foinse, ID tasc ETL) a chur leis agus caithimid go naive iad, abair, Vertica.

A ligean ar dul!

An phríomhchuid, praiticiúil (agus beagán teoiriciúil)

Cén fáth a bhfuil muid (agus tú)

Nuair a bhí na crainn mór agus bhí mé simplí SQL-schik i miondíol Rúisis amháin, rinneamar próisis ETL a scamadh mar a chéile le sreafaí sonraí ag baint úsáide as dhá uirlis atá ar fáil dúinn:

  • Lárionad Cumhachta Informatica - córas thar a bheith scaipthe, thar a bheith táirgiúil, lena crua-earraí féin, a leagan féin. Bhain mé úsáid as Dia forbid 1% dá cumais. Cén fáth? Bhuel, ar an gcéad dul síos, chuir an comhéadan seo, áit éigin ó na 380í, brú orainn go meabhrach. Ar an dara dul síos, tá an contraption seo deartha le haghaidh próisis an-mhaisiúla, athúsáid comhpháirte buile agus cleasanna eile atá thar a bheith tábhachtach don fhiontar. Maidir leis an méid a chosnaíonn sé, cosúil le sciathán an Airbus AXNUMX / bliain, ní déarfaimid rud ar bith.

    Bí ar an airdeall, is féidir le screenshot daoine faoi 30 a ghortú beagán

    Apache Airflow: ETL a dhéanamh níos éasca

  • Freastalaí Comhtháthaithe SQL Server - d'úsáideamar an comrádaí seo inár sreafaí laistigh den tionscadal. Bhuel, i ndáiríre: úsáidimid SQL Server cheana féin, agus bheadh ​​​​sé míréasúnta ar bhealach éigin gan a chuid uirlisí ETL a úsáid. Tá gach rud go maith: tá an comhéadan álainn araon, agus tuairiscíonn an dul chun cinn ... Ach ní hé seo an fáth go bhfuil grá againn ar tháirgí bogearraí, ó, ní le haghaidh seo. Leagan é dtsx (a bhfuil XML le nóid shuffled ar shábháil) is féidir linn, ach cad é an pointe? Cad faoi thasc-phacáiste a dhéanamh a tharraingeoidh na céadta tábla ó fhreastalaí amháin go freastalaí eile? Sea, cad céad, beidh do mhéar innéacs titim amach as fiche píosaí, cliceáil ar an cnaipe luiche. Ach is cinnte go bhféachann sé níos faiseanta:

    Apache Airflow: ETL a dhéanamh níos éasca

Is cinnte gur lorg muid bealaí amach. Cás fiú beagnach tháinig go dtí gineadóir pacáiste SSIS féinscríofa ...

…agus ansin fuair post nua mé. Agus rug Apache Airflow orm air.

Nuair a fuair mé amach go bhfuil cur síos ar phróiseas ETL cód simplí Python, ní raibh mé ag damhsa le haghaidh áthas. Seo é an chaoi a ndearnadh leaganacha agus difríocht de shruthanna sonraí, agus tháinig sé chun bheith ina ábhar de chód Python i gceann go leith nó dhá scáileán 13” táblaí le struchtúr amháin a dhoirteadh ó na céadta bunachar sonraí isteach i sprioc amháin.

Cnuasach an bhraisle

Ná déanaimis kindergarten go hiomlán a shocrú, agus gan labhairt faoi rudaí go hiomlán soiléir anseo, cosúil le Airflow a shuiteáil, do bhunachar sonraí roghnaithe, Soilire agus cásanna eile a thuairiscítear sna duganna.

Ionas gur féidir linn turgnaimh a thosú láithreach, rinne mé sceitse docker-compose.yml ina bhfuil:

  • A ligean ar a ardú i ndáiríre Sruth aeir: Sceidealóir, Freastalaí Gréasáin. Beidh Flower ag sníomh ann freisin chun monatóireacht a dhéanamh ar thascanna Soilire (toisc go bhfuil sé curtha isteach cheana féin apache/airflow:1.10.10-python3.7, ach is cuma linn)
  • PostgreSQL, ina scríobhfaidh Airflow a chuid faisnéise seirbhíse (sonraí sceidealaithe, staitisticí forghníomhaithe, etc.), agus marcálfaidh Soilire tascanna críochnaithe;
  • Redis, a fheidhmeoidh mar bhróicéir tasc do Soilire;
  • Oibrí soilire, a bheidh ag gabháil do chur i gcrích díreach cúraimí.
  • Go fillteán ./dags cuirfimid ár gcomhaid leis an tuairisc ar dags. Déanfar iad a phiocadh suas ar an eitilt, mar sin ní gá a juggle an chairn iomlán tar éis gach sraothartach.

I roinnt áiteanna, nach bhfuil an cód sna samplaí a thaispeáint go hiomlán (ionas nach a tranglam suas an téacs), ach áit éigin tá sé modhnaithe sa phróiseas. Is féidir samplaí de chóid oibre iomlána a fháil sa stór https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Nótaí:

  • I gcomhthionól an chomhdhéanamh, bhí mé ag brath go mór ar an íomhá aitheanta go maith puclach/dugaire-aersreabh - bí cinnte é a sheiceáil. B'fhéidir nach bhfuil aon rud eile ag teastáil uait i do shaol.
  • Tá gach socrú Airflow ar fáil ní hamháin tríd airflow.cfg, ach freisin trí athróga timpeallachta (a bhuíochas leis na forbróirí), ar bhain mé leas mailíseach astu.
  • Ar ndóigh, níl sé réidh le táirgeadh: níor chuir mé buille croí ar choimeádáin d'aon ghnó, níor bhac mé le slándáil. Ach rinne mé an t-íosmhéid oiriúnach dár turgnamhóirí.
  • Tabhair faoi deara:
    • Caithfidh an fillteán dag a bheith inrochtana don sceidealóir agus do na hoibrithe araon.
    • Baineann an rud céanna le gach leabharlann tríú páirtí - ní mór iad go léir a shuiteáil ar mheaisíní le sceidealóir agus oibrithe.

Bhuel, anois tá sé simplí:

$ docker-compose up --scale worker=3

Tar éis gach rud a ardú, is féidir leat breathnú ar na comhéadain gréasáin:

Bunchoincheapa

Más rud é nár thuig tú aon rud sna “dags” seo go léir, seo chugainn foclóir gairid:

  • Sceidealóir - an t-uncail is tábhachtaí i Airflow, ag rialú go n-oibríonn robots go crua, agus ní duine: déanann sé monatóireacht ar an sceideal, nuashonraíonn dags, seolann sé tascanna.

    Go ginearálta, i leaganacha níos sine, bhí fadhbanna aige le cuimhne (níl, ní amnesia, ach sceitheadh) agus d'fhan an paraiméadar oidhreachta fiú sna cumraíochtaí run_duration - a eatramh atosú. Ach anois tá gach rud go breá.

  • GCM (aka "dag") - "graf aicyclic faoi threoir", ach is beag duine a inseoidh sainmhíniú den sórt sin, ach i ndáiríre is coimeádán é le haghaidh tascanna a idirghníomhaíonn lena chéile (féach thíos) nó analóg de Phacáiste i SSIS agus Sreabhadh Oibre in Informatica .

    Chomh maith le daga, d'fhéadfadh go mbeadh fo-dhuillí ann fós, ach is dócha nach n-éireoidh linn iad.

  • Rith DAG - dag tosaithe, a sanntar a chuid féin execution_date. Is féidir le dagrans den dag céanna oibriú ag an am céanna (má tá do thascanna déanta agat gan mhoill, ar ndóigh).
  • oibreoir is píosaí cód iad atá freagrach as gníomh sonrach a dhéanamh. Tá trí chineál oibreoirí ann:
    • gníomhcosúil lenár is fearr leat PythonOperator, ar féidir leo aon chód Python (bailí) a fhorghníomhú;
    • aistriú, a iompraíonn sonraí ó áit go háit, abair, MsSqlToHiveTransfer;
    • braiteoir ar an láimh eile, ligfidh sé duit freagairt nó mhoilliú a dhéanamh ar fhorghníomhú breise an dag go dtí go dtarlaíonn teagmhas. HttpSensor is féidir leis an críochphointe sonraithe a tharraingt, agus nuair a bhíonn an freagra inmhianaithe ag fanacht, cuir tús leis an aistriú GoogleCloudStorageToS3Operator. Fiafróidh meon fiosrach: “cén fáth? Tar éis an tsaoil, is féidir leat athrá a dhéanamh díreach san oibreoir!” Agus ansin, ionas nach gcuirfear bac ar an linn tascanna le hoibreoirí ar fionraí. Tosaíonn an braiteoir, seiceálann agus bás roimh an gcéad iarracht eile.
  • Tasc - déantar oibreoirí dearbhaithe, beag beann ar an gcineál, agus atá ceangailte leis an dag a ardú go céim an taisc.
  • shampla tasc - nuair a chinn an pleanálaí ginearálta go raibh sé in am tascanna a chur chun catha ar taibheoirí-oibrithe (ar an bpointe, má úsáidimid LocalExecutor nó chuig nód iargúlta i gcás CeleryExecutor), sannann sé comhthéacs dóibh (i.e., sraith athróg - paraiméadair forghníomhaithe), leathnaíonn sé teimpléid ordaithe nó fiosrúcháin, agus comhthiomsaíonn sé iad.

Gineann muid tascanna

Ar dtús, déanaimis breac-chuntas ar scéim ghinearálta ár doug, agus ansin déanfaimid níos mó agus níos mó isteach ar na sonraí, toisc go gcuirimid roinnt réitigh neamhfhánacha i bhfeidhm.

Mar sin, sa bhfoirm is simplí, beidh cuma mar seo ar a leithéid de dag:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Déanaimis é a dhéanamh amach:

  • Gcéad dul síos, allmhairímid na libs riachtanacha agus rud éigin eile;
  • sql_server_ds - An bhfuil List[namedtuple[str, str]] le hainmneacha na nasc ó Airflow Connections agus na bunachair shonraí óna dtógfaimid ár bpláta;
  • dag - an fógra ár dag, ní mór a bheith riachtanach i globals(), nó ní bhfaighidh Airflow é. Ní mór do Doug a rá freisin:
    • Cad is ainm dó orders - beidh an t-ainm seo le feiceáil sa chomhéadan gréasáin ansin,
    • go n-oibreoidh sé ó mheadhon oidhche an t-ochtmhadh d'Iúl,
    • agus ba chóir é a rith, thart ar gach 6 uair an chloig (do guys diana anseo in ionad timedelta() inghlactha cron-líne 0 0 0/6 ? * * *, le haghaidh an níos lú fionnuar - slonn mar @daily);
  • workflow() Déanfaidh an príomh-jab, ach ní anois. Faoi láthair, ní dhéanfaimid ach ár gcomhthéacs a dhumpáil isteach sa loga.
  • Agus anois an draíocht simplí a bhaineann le tascanna a chruthú:
    • rithimid tríd ár bhfoinsí;
    • tosaigh PythonOperator, a fhorghníomhóidh ár Caochadán workflow(). Ná déan dearmad ainm uathúil (laistigh den dag) don tasc a shonrú agus an dag féin a cheangal. Bratach provide_context ina dhiaidh sin, Doirt argóintí breise isteach an fheidhm, a bheidh muid a bhailiú go cúramach ag baint úsáide as **context.

Go dtí seo, sin é go léir. Cad a fuair muid:

  • dag nua sa chomhéadan gréasáin,
  • céad go leith tasc a dhéanfar go comhthreomhar (má cheadaíonn na socruithe Airflow, Soilire agus cumas an fhreastalaí é).

Bhuel, fuair beagnach é.

Apache Airflow: ETL a dhéanamh níos éasca
Cé a shuiteáilfidh na spleáchais?

Chun an rud ar fad seo a shimpliú, chuaigh mé isteach docker-compose.yml próiseáil requirements.txt ar gach nóid.

Anois tá sé imithe:

Apache Airflow: ETL a dhéanamh níos éasca

Is tascanna iad cearnóga liatha a phróiseálann an sceidealóir.

Fanann muid beagán, gearrann na hoibrithe na tascanna:

Apache Airflow: ETL a dhéanamh níos éasca

Tá na cinn glas, ar ndóigh, tar éis a gcuid oibre a chríochnú go rathúil. Níl an-rath ar Reds.

Dála an scéil, níl aon fhillteán ar ár dtáirgí ./dags, níl aon sioncrónú idir meaisíní - luíonn gach dags isteach git ar ár Gitlab, agus dáileann Gitlab CI nuashonruithe ar mheaisíní agus iad á gcumasc isteach master.

Beagán faoi Flower

Cé go bhfuil na hoibrithe ag caoineadh ár n-pacifiers, cuimhnigh ar uirlis eile a fhéadfaidh rud éigin a thaispeáint dúinn - Flower.

An chéad leathanach ina bhfuil faisnéis achomair ar nóid oibrithe:

Apache Airflow: ETL a dhéanamh níos éasca

An leathanach is déine le tascanna a chuaigh ag obair:

Apache Airflow: ETL a dhéanamh níos éasca

An leathanach is leadránach le stádas ár mbróicéir:

Apache Airflow: ETL a dhéanamh níos éasca

Tá an leathanach is gile le graif stádais tascanna agus a gcuid ama cur i gcrích:

Apache Airflow: ETL a dhéanamh níos éasca

Táimid luchtaithe an underloaded

Mar sin, d'oibrigh na tascanna go léir amach, is féidir leat an lucht créachtaithe a dhéanamh.

Apache Airflow: ETL a dhéanamh níos éasca

Agus bhí go leor lucht créachtaithe - ar chúis amháin nó eile. I gcás úsáid cheart a bhaint as Airflow, léiríonn na cearnóga céanna seo nach raibh na sonraí cinnte.

Ní mór duit féachaint ar an logáil agus atosú ar na cásanna tasc tite.

Trí chliceáil ar aon chearnóg, feicfimid na gníomhartha atá ar fáil dúinn:

Apache Airflow: ETL a dhéanamh níos éasca

Is féidir leat a ghlacadh agus a dhéanamh Glan an tite. Is é sin, déanaimid dearmad go bhfuil rud éigin teipthe ann, agus beidh an tasc ásc céanna téigh go dtí an sceidealóir.

Apache Airflow: ETL a dhéanamh níos éasca

Is léir nach bhfuil sé an-daonnúil é seo a dhéanamh leis an luch leis na cearnóga dearga go léir - ní hé seo an rud a mbeimid ag súil leis ó Airflow. Ar ndóigh, tá airm ollscriosta againn: Browse/Task Instances

Apache Airflow: ETL a dhéanamh níos éasca

Roghnaigh muid gach rud ag an am céanna agus athshocróimid go nialas, cliceáil ar an mír cheart:

Apache Airflow: ETL a dhéanamh níos éasca

Tar éis a ghlanadh, breathnaíonn ár tacsaithe mar seo (tá siad ag fanacht leis an sceidealóir iad a sceidealú cheana féin):

Apache Airflow: ETL a dhéanamh níos éasca

Naisc, crúcaí agus athróga eile

Tá sé in am breathnú ar an gcéad DAG eile, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

An ndearna gach duine nuashonrú tuairisce riamh? Seo í arís: tá liosta foinsí ann óna bhfaightear na sonraí; tá liosta cá háit le cur; ná déan dearmad honk nuair a tharla nó a bhris gach rud (bhuel, níl sé seo fúinn, níl).

A ligean ar dul tríd an comhad arís agus breathnú ar an stuif nua doiléir:

  • from commons.operators import TelegramBotSendMessage - ní chuireann aon rud cosc ​​orainn ár n-oibreoirí féin a dhéanamh, rud a bhaineamar leas as trí fhillteán beag a dhéanamh chun teachtaireachtaí a sheoladh chuig Unblocked. (Beidh muid ag caint níos mó faoin oibreoir seo thíos);
  • default_args={} - is féidir le dag na hargóintí céanna a dháileadh ar a cuid oibreoirí go léir;
  • to='{{ var.value.all_the_kings_men }}' - Gort to ní bheidh hardcoded againn, ach ginte go dinimiciúil ag baint úsáide as Jinja agus athróg le liosta de na ríomhphoist, a chuir mé go cúramach i Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — coinníoll chun an t-oibreoir a thosú. Is é ár gcás, beidh an litir ag eitilt go dtí an bosses ach amháin má tá gach spleáchas oibriú amach go rathúil;
  • tg_bot_conn_id='tg_main' - argóintí conn_id glacadh le haitheantais naisc a chruthaímid iontu Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ní bheidh teachtaireachtaí i Telegram eitilt ar shiúl ach amháin má tá tascanna tite;
  • task_concurrency=1 - cuirimid cosc ​​ar roinnt cásanna de thasc amháin a sheoladh go comhuaineach. Seachas sin, gheobhaidh muid an seoladh comhuaineach roinnt VerticaOperator (ag féachaint ar bhord amháin);
  • report_update >> [email, tg] - go léir VerticaOperator teacht le chéile agus litreacha agus teachtaireachtaí á seoladh, mar seo:
    Apache Airflow: ETL a dhéanamh níos éasca

    Ach ós rud é go bhfuil coinníollacha seolta éagsúla ag oibreoirí fógra, ní oibreoidh ach duine amháin. Sa Tree View, tá cuma beagán níos lú ar gach rud:
    Apache Airflow: ETL a dhéanamh níos éasca

Beidh mé ag rá cúpla focal faoi macraí agus a gcairde - athróga.

Is áitshealbhóirí Jinja iad Macraí ar féidir leo faisnéis úsáideach éagsúla a chur in áit argóintí oibreoirí. Mar shampla, mar seo:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} leathnófar go dtí ábhar na hathróige comhthéacs é execution_date san fhormáid YYYY-MM-DD: 2020-07-14. Is é an chuid is fearr ná go gcuirtear athróga comhthéacs in iúl do thasc ar leith (cearnóg sa Tree View), agus nuair a atosófar iad, leathnóidh na sealbhóirí áite chuig na luachanna céanna.

Is féidir na luachanna sannta a fheiceáil ag baint úsáide as an gcnaipe Rindreáilte ar gach tasc ásc. Seo mar a bhí an tasc le litir a sheoladh:

Apache Airflow: ETL a dhéanamh níos éasca

Agus mar sin ag an tasc le teachtaireacht a sheoladh:

Apache Airflow: ETL a dhéanamh níos éasca

Tá liosta iomlán macraí ionsuite don leagan is déanaí atá ar fáil ar fáil anseo: tagairt macraí

Thairis sin, le cabhair ó fhorlíontáin, is féidir linn ár macraí féin a dhearbhú, ach sin scéal eile.

Chomh maith leis na rudaí réamhshainithe, is féidir linn luachanna ár n-athróg a chur in ionad (d'úsáid mé é seo cheana féin sa chód thuas). A ligean ar a chruthú i Admin/Variables cupla rud:

Apache Airflow: ETL a dhéanamh níos éasca

Gach rud is féidir leat a úsáid:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Is féidir leis an luach a bheith ina scálach, nó is féidir gur JSON é freisin. I gcás JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

bain úsáid as an cosán go dtí an eochair atá ag teastáil: {{ var.json.bot_config.bot.token }}.

Déarfaidh mé focal amháin go litriúil agus taispeánfaidh mé seat amháin faoi naisc. Tá gach rud bunrang anseo: ar an leathanach Admin/Connections cruthaímid nasc, cuirimid ár logins / pasfhocail agus paraiméadair níos sainiúla ann. Mar seo:

Apache Airflow: ETL a dhéanamh níos éasca

Is féidir pasfhocail a chriptiú (níos cruinne ná an réamhshocrú), nó is féidir leat an cineál ceangail a fhágáil amach (mar a rinne mé do tg_main) - is é an bhfíric go bhfuil an liosta de na cineálacha hardwired i múnlaí Airflow agus ní féidir a leathnú gan dul isteach ar na cóid foinse (más rud é go tobann ní raibh mé google rud éigin, le do thoil ceart dom), ach ní bheidh aon rud a stopadh dúinn ó creidmheasanna a fháil díreach ag ainm.

Is féidir leat a dhéanamh freisin naisc éagsúla leis an ainm céanna: sa chás seo, an modh BaseHook.get_connection(), a fhaigheann naisc de réir ainm dúinn, a thabhairt randamach ó roinnt ainmneacha (bheadh ​​sé níos loighciúil Babhta Robin a dhéanamh, ach fágaimis é ar choinsias na bhforbróirí Airflow).

Is cinnte gur uirlisí fionnuara iad Athróga agus Naisc, ach tá sé tábhachtach gan an t-iarmhéid a chailleadh: cé na codanna de do shreabhadh a stórálann tú sa chód féin, agus na codanna a thugann tú do Airflow le haghaidh stórála. Ar thaobh amháin, is féidir go mbeadh sé áisiúil an luach a athrú go tapa, mar shampla, bosca poist, tríd an Chomhéadain. Ar an láimh eile, tá sé seo fós ar ais chuig an cliceáil luiche, as a raibh muid (mé) ag iarraidh a fháil réidh.

Tá oibriú le naisc ar cheann de na tascanna crúcaí. Go ginearálta, is pointí iad crúcaí Airflow chun é a nascadh le seirbhísí agus leabharlanna tríú páirtí. m.sh., JiraHook osclóidh cliant dúinn idirghníomhú le Jira (is féidir leat tascanna a bhogadh ar ais agus amach), agus le cabhair ó SambaHook is féidir leat comhad áitiúil a bhrú chun smb-pointe.

An t-oibreoir saincheaptha a pharsáil

Agus tháinig gar dúinn féachaint ar conas a dhéantar é TelegramBotSendMessage

Cód commons/operators.py leis an oibreoir iarbhír:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Anseo, cosúil le gach rud eile in Airflow, tá gach rud an-simplí:

  • Oidhreacht ó BaseOperator, a chuireann go leor rudaí a bhaineann go sonrach le Aer-Shreabhadh i bhfeidhm (féach ar do chuid fóillíochta)
  • Réimsí dearbhaithe template_fields, ina mbeidh Jinja lorg Macraí a phróiseáil.
  • D'eagraigh na hargóintí cearta ar son __init__(), socraigh na mainneachtainí nuair is gá.
  • Ní dhearna muid dearmad faoi thúsú an sinsear ach an oiread.
  • D'oscail an hook comhfhreagrach TelegramBotHookfuair rud cliant uaidh.
  • Modh sáraithe (athshainithe). BaseOperator.execute(), a dhéanfaidh Airfow twitch nuair a thagann an t-am chun an t-oibreoir a sheoladh - ann cuirfimid an príomhghníomh i bhfeidhm, ag dearmad logáil isteach. (Logaimid isteach, dála an scéil, ceart isteach stdout и stderr - Déanfaidh aer-sreabhadh gach rud a thascradh, é a fhilleadh go hálainn, é a dhianscaoileadh nuair is gá.)

A ligean ar a fheiceáil cad atá againn commons/hooks.py. An chéad chuid den chomhad, leis an duán féin:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Níl a fhios agam fiú cad atá le míniú anseo, ní dhéanfaidh mé ach na pointí tábhachtacha a thabhairt faoi deara:

  • Déanaimid oidhreacht, smaoinímid ar na hargóintí - i bhformhór na gcásanna beidh sé ar cheann: conn_id;
  • Sárú ar mhodhanna caighdeánacha: chuir mé teorainn orm féin get_conn(), ina bhfaighidh mé na paraiméadair nasc de réir ainm agus díreach an t-alt a fháil extra (réimse JSON é seo), inar chuir mé (de réir mo threoracha féin!) an comhartha bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Cruthaím sampla dár TelegramBot, rud a thugann comhartha sonrach dó.

Sin é an méid. Is féidir leat cliant a fháil ó Hook ag baint úsáide as TelegramBotHook().clentTelegramBotHook().get_conn().

Agus an dara cuid den chomhad, ina ndéanaim microwrapper don Telegram REST API, ionas nach tarraingim an rud céanna python-telegram-bot ar mhodh amháin sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Is é an bealach ceart é a shuimiú: TelegramBotSendMessage, TelegramBotHook, TelegramBot - sa bhreiseán, cuir i stór poiblí é, agus tabhair chuig Foinse Oscailte é.

Agus muid ag déanamh staidéir air seo go léir, d’éirigh lenár nuashonruithe tuairisce theip go rathúil agus chuir siad teachtaireacht earráide chugam sa chainéal. Táim chun seiceáil féachaint an bhfuil sé mícheart ...

Apache Airflow: ETL a dhéanamh níos éasca
Bhris rud éigin inár madra! Nach é sin a rabhamar ag súil leis? Díreach!

An bhfuil tú ag dul a dhoirteadh?

An mbraitheann tú gur chaill mé rud éigin? Dealraíonn sé gur gheall sé sonraí a aistriú ó SQL Server go Vertica, agus ansin thóg sé é agus bhog sé as an ábhar, an scoundrel!

Bhí an t-uafás seo d'aon ghnó, ní raibh orm ach téarmaíocht éigin a mhíniú duit. Anois is féidir leat dul níos faide.

Ba é seo an plean a bhí againn:

  1. Déan dag
  2. Gin tascanna
  3. Féach cé chomh hálainn atá gach rud
  4. Sann uimhreacha seisiúin le líonadh
  5. Faigh sonraí ó SQL Server
  6. Cuir sonraí isteach i Vertica
  7. Bailigh staitisticí

Mar sin, chun é seo a chur ar bun agus a reáchtáil, chuir mé beagán breise lenár gcuid docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ardaímid ansin:

  • Vertica mar óstach dwh leis na socruithe is réamhshocraithe,
  • trí chás de fhreastalaí SQL,
  • líonaimid na bunachair shonraí sa dara ceann le roinnt sonraí (ní breathnú isteach i gcás ar bith mssql_init.py!)

Seolann muid gach rud go maith le cabhair ó ordú beagán níos casta ná an uair dheireanach:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Cad a ghin ár randomizer miracle, is féidir leat úsáid a bhaint as an mír Data Profiling/Ad Hoc Query:

Apache Airflow: ETL a dhéanamh níos éasca
Is é an rud is mó ná é a thaispeáint d'anailísithe

mionléiriú ar Seisiúin ETL Ní dhéanfaidh mé, tá gach rud fánach ansin: déanaimid bonn, tá comhartha ann, fillteann muid gach rud le bainisteoir comhthéacs, agus anois déanaimid é seo:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

seisiún.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Tá an t-am tagtha ár sonraí a bhailiú ónár gcéad go leith tábla. Déanaimis é seo le cabhair ó línte an-unpretentious:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Le cabhair ó Hook a fháil againn ó Airflow pymssql-ceangal
  2. Cuir srian i bhfoirm dáta isteach san iarratas - cuirfidh an t-inneall teimpléad isteach é san fheidhm.
  3. Ár n-iarratas a bheathú pandasa gheobhaidh sinn DataFrame - beidh sé úsáideach dúinn amach anseo.

Tá ionadú á úsáid agam {dt} in ionad paraiméadar iarratais %s ní toisc gur Pinocchio olc mé, ach toisc pandas Ní féidir a láimhseáil pymssql agus duillíní an ceann deireanach params: Listcé gur mian leis i ndáiríre tuple.
Chomh maith leis sin faoi deara go bhfuil an forbróir pymssql chinn gan tacaíocht a thabhairt dó níos mó, agus tá sé in am chun bogadh amach pyodbc.

Feicfimid cad a líontar argóintí ár bhfeidhmeanna le Airflow:

Apache Airflow: ETL a dhéanamh níos éasca

Mura bhfuil aon sonraí ann, níl aon phointe ann leanúint ar aghaidh. Ach tá sé aisteach freisin an líonadh a mheas go rathúil. Ach ní botún é seo. A-ah-ah, cad atá le déanamh?! Agus seo cad:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException insíonn sé do Airflow nach bhfuil aon earráidí, ach ní dhéanaimid an tasc. Ní bheidh cearnach glas nó dearg ag an gcomhéadan, ach bándearg.

Caithfimid ár sonraí colúin iolracha:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

eadhon

  • An bunachar sonraí ónar ghlacamar na horduithe,
  • ID ár seisiún tuilte (beidh sé difriúil do gach tasc),
  • A hash ón bhfoinse agus ID ordú - ionas go mbeidh sa bhunachar sonraí deiridh (ina bhfuil gach rud poured isteach tábla amháin) ní mór dúinn ID ordú uathúil.

Tá an chéim leathdhéanach fós: doirt gach rud isteach i Vertica. Agus, rud aisteach go leor, is é CSV ceann de na bealaí is iontach agus is éifeachtaí chun é seo a dhéanamh!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Táimid ag déanamh glacadóir speisialta StringIO.
  2. pandas cuirfidh cineálta ár DataFrame i bhfoirm CSV-línte.
  3. A ligean ar oscailt nasc chuig ár Vertica is fearr leat le Hook.
  4. Agus anois le cabhair copy() seol ár sonraí go díreach chuig Vertika!

Tógfaimid ón tiománaí cé mhéad líne a líonadh, agus inseoimid do bhainisteoir an tseisiúin go bhfuil gach rud ceart go leor:

session.loaded_rows = cursor.rowcount
session.successful = True

Sin é an méid.

Ar an díolachán, cruthaímid an spriocphláta de láimh. Anseo cheadaigh mé meaisín beag dom féin:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Tá mé ag úsáid VerticaOperator() Cruthaím scéimre bunachar sonraí agus tábla (mura bhfuil siad ann cheana féin, ar ndóigh). Is é an rud is mó ná na spleáchais a shocrú i gceart:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Achoimre

- Bhuel, - a dúirt an luch beag, - nach bhfuil, anois
An bhfuil tú cinnte gurb mise an t-ainmhí is uafásach san fhoraois?

Julia Donaldson, An Gruffalo

I mo thuairimse, dá mbeadh mo chomhghleacaithe agus mé comórtas: cé a chruthú agus a sheoladh go tapa próiseas ETL ó scratch: siad lena SSIS agus luch agus mé le Airflow ... Agus ansin ba mhaith linn a chur i gcomparáid freisin ar an éascaíocht cothabhála ... Wow, sílim go n-aontóidh tú go mbuafaidh mé iad ar gach taobh!

Más rud é beagán níos dáiríre, ansin rinne Apache Airflow - trí chur síos a dhéanamh ar phróisis i bhfoirm cód cláir - mo phost i bhfad níos compordaí agus níos taitneamhaí.

Tugann a shíneadh neamhtheoranta, i dtéarmaí breiseán agus réamhshuíomh le hinscálaitheacht, an deis duit Airflow a úsáid i mbeagnach aon réimse: fiú amháin sa timthriall iomlán de bhailiú, ullmhú agus próiseáil sonraí, fiú agus roicéad á seoladh (go Mars, de. cúrsa).

Cuid deiridh, tagairt agus faisnéis

An raic atá bailithe againn duit

  • start_date. Sea, is meme áitiúil é seo cheana féin. Trí phríomh-argóint Doug start_date pas ar fad. Go hachomair, má shonraíonn tú i start_date dáta reatha, agus schedule_interval - lá amháin, ansin beidh DAG ag tosú amárach tráth nach luaithe.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Agus gan fadhbanna níos mó.

    Tá earráid ama rite eile bainteach leis: Task is missing the start_date parameter, rud a léiríonn go minic go ndearna tú dearmad ceangal a dhéanamh leis an oibreoir dag.

  • Go léir ar mheaisín amháin. Sea, agus bunanna (Airflow féin agus ár sciath), agus freastalaí gréasáin, agus sceidealóir, agus oibrithe. Agus d'oibrigh sé fiú. Ach le himeacht ama, d'fhás líon na dtascanna do sheirbhísí, agus nuair a thosaigh PostgreSQL ag freagairt don innéacs i 20 s in ionad 5 ms, thógamar é agus thugamar ar shiúl é.
  • Seiceadóir Áitiúil. Sea, táimid fós inár suí air, agus tá muid tagtha go dtí imeall an duibheagáin cheana féin. Is leor LocalExecutor dúinn go dtí seo, ach anois tá sé in am leathnú le hoibrí amháin ar a laghad, agus beidh orainn oibriú go crua chun bogadh go CeleryExecutor. Agus i bhfianaise gur féidir leat oibriú leis ar mheaisín amháin, ní chuireann aon rud bac ort Soilire a úsáid fiú ar fhreastalaí, rud “ar ndóigh, ní rachaidh sé isteach i dtáirgeadh, go hionraic!”
  • Neamhúsáid uirlisí ionsuite:
    • Naisc chun dintiúir seirbhíse a stóráil,
    • SLA chailleann freagairt do thascanna nár oibrigh amach in am,
    • xcom le haghaidh malartú meiteashonraí (a dúirt mé meiteadata!) idir thascanna dag.
  • Mí-úsáid ríomhphoist. Bhuel, cad is féidir liom a rá? Socraíodh foláirimh le haghaidh gach athrá ar thascanna tite. Anois tá >90k r-phoist ag Gmail ó Airflow, agus diúltaíonn an muzzle ríomhphoist gréasáin níos mó ná 100 a phiocadh suas agus a scriosadh ag an am céanna.

Tuilleadh gaistí: Machair Sreafa Aeir Apache

Tuilleadh uirlisí uathoibrithe

Ionas gur féidir linn oibriú níos mó fós lenár gcinn agus ní lenár lámha, d’ullmhaigh Airflow dúinn é seo:

  • REST API - tá stádas Turgnamhach fós aige, rud nach gcuireann cosc ​​air oibriú. Leis, ní hamháin gur féidir leat eolas a fháil faoi dags agus faoi thascanna, ach freisin stop a chur / tús a chur le dag, Rith DAG nó linn snámha a chruthú.
  • CLI - tá go leor uirlisí ar fáil tríd an líne ordaithe nach bhfuil deacair iad a úsáid tríd an WebUI, ach atá as láthair go ginearálta. Mar shampla:
    • backfill ag teastáil chun cásanna tasc a atosú.
      Mar shampla, tháinig anailísithe agus dúirt: “Agus tá nonsense agatsa, a chomrádaí, sna sonraí ón 1 Eanáir go dtí an 13 Eanáir! Deisigh é, deisigh é, deisigh é, deisigh é!" Agus is hobad den sórt sin thú:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Bunseirbhís: initdb, resetdb, upgradedb, checkdb.
    • run, a ligeann duit tasc ásc amháin a rith, agus fiú scóráil ar gach spleáchas. Thairis sin, is féidir leat é a rith trí LocalExecutor, fiú má tá braisle Soilire agat.
    • An rud céanna go leor test, ach freisin i boinn scríobhann rud ar bith.
    • connections Ceadaíonn cruthú mais naisc as an bhlaosc.
  • API Python - bealach idirghníomhaithe sách crua, atá beartaithe le haghaidh breiseán, agus gan a bheith ag snámh inti le lámha beaga. Ach cé atá chun stop a chur orainn dul go dtí /home/airflow/dags, rith ipython agus tosú ag praiseach thart? Is féidir leat, mar shampla, gach nasc leis an gcód seo a leanas a onnmhairiú:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Ag nascadh le bunachar meiteashonraí Airflow. Ní mholaim scríobh chuige, ach d’fhéadfadh sé a bheith i bhfad níos tapúla agus níos éasca sonraí tasc a fháil le haghaidh méadrachtaí sonracha éagsúla ná trí aon cheann de na APInna.

    Ligean le rá nach bhfuil gach ceann dár tascanna idempotent, ach is féidir leo titim uaireanta, agus tá sé seo gnáth. Ach tá cúpla blockchain amhrasach cheana féin, agus bheadh ​​​​sé riachtanach a sheiceáil.

    Seachain SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

tagairtí

Agus ar ndóigh, is iad na chéad deich nasc ó eisiúint Google ábhar an fhillteáin Airflow ó mo leabharmharcanna.

Agus na naisc a úsáidtear san alt:

Foinse: will.com