Apache Rererangi: Maama ake te ETL

Kia ora, ko Dmitry Logvinenko ahau - Kaihanga Raraunga o te Tari Tatari o te roopu kamupene Vezet.

Ka korero ahau ki a koe mo tetahi taputapu whakamiharo mo te whakawhanake i nga tikanga ETL - Apache Airflow. Engari ko te Airflow he tino mohio me te maha o nga waahanga me ata titiro koe ahakoa kaore koe e uru ki nga rerenga raraunga, engari me whakarewahia e koe nga tikanga me te aro turuki i o raatau mahi.

Ae, kaore au e korero noa, engari ka whakaatu ano hoki: he maha nga tohu o te kaupapa, nga whakaahua me nga taunakitanga.

Apache Rererangi: Maama ake te ETL
Ko nga mea ka kitea e koe ina google koe i te kupu Airflow / Wikimedia Commons

Ripanga o nga ihirangi

Whakataki

He rite tonu a Apache Airflow ki a Django:

  • tuhia ki te python
  • he papa whakahaere pai,
  • whakawhänui ake ake

- he pai ake, he mea hanga mo nga kaupapa rereke, ara (i tuhia ki mua i te kata):

  • te whakahaere me te aro turuki i nga mahi i runga i te maha o nga mihini (he maha nga herewi / Kubernetes me to hinengaro ka whakaae koe)
  • me te whakaputa rerengamahi hihiri mai i te tino ngawari ki te tuhi me te mohio ki te waehere Python
  • me te kaha ki te hono i nga papaa raraunga me nga API ki a raatau ano ma te whakamahi i nga waahanga kua rite me nga taputapu hanga-whare (he tino ngawari).

Ka whakamahia e matou a Apache Airflow penei:

  • ka kohia e matou nga raraunga mai i nga momo puna (he maha nga waahanga SQL Server me PostgreSQL, nga momo API me nga inenga tono, tae noa ki te 1C) i DWH me ODS (kei a matou a Vertica me Clickhouse).
  • pehea te matatau cron, ka timata i nga tukanga whakakotahi raraunga i runga i te ODS, me te aroturuki hoki i to raatau tiaki.

Tae noa ki tenei wa, i hipokina o matou hiahia e tetahi tūmau iti me te 32 cores me te 50 GB o te RAM. I roto i te Airflow, ka mahi tenei:

  • atu 200 tau (ko nga rerengamahi, i whakakiia e matou nga mahi),
  • i ia toharite 70 nga mahi,
  • ka timata tenei pai (me te toharite) kotahi te haora.

A mo te pehea i whakawhānuihia ai, ka tuhia e ahau ki raro, engari inaianei me tautuhi te raruraru-über ka whakatauhia e tatou:

E toru nga punanga SQL Servers, e 50 nga papaaarangi o ia waahanga - he tauira o te kaupapa kotahi, he rite tonu te hanganga (tata ki nga waahi katoa, mua-ha-ha), ko te tikanga he ripanga Whakataua kei ia tangata (waimarie, he tepu me tera. Ka taea te pana te ingoa ki tetahi umanga). Ka tangohia e matou nga raraunga ma te taapiri i nga mara ratonga (tumau puna, puna raraunga, ID mahi ETL) ka maka ki roto, kii, Vertica.

Haere tatou!

Ko te waahanga matua, mahi (me te iti o te kaupapa)

He aha tatou (me koe)

I te wa e nui ana nga rakau, he maamaa ahau SQL-schik i roto i tetahi hokohoko Ruhia, i tinihangatia e matou nga tukanga ETL aka rere raraunga ma te whakamahi i nga taputapu e rua e waatea ana ki a maatau:

  • Pokapū Hiko Informatica - he punaha tino horahanga, tino whai hua, me ona ake taputapu, tana ake whakaputanga. I whakamahia e ahau te Atua kia kore e 1% o ona kaha. He aha? Kaati, ko te tuatahi, ko tenei atanga, mai i te tau 380, ka pehi a hinengaro ki a tatou. Tuarua, kua hangaia tenei taputapu mo nga mahi tino ataahua, te whakamahi ano i nga waahanga riri me etahi atu mahi tino-nui-ahumahi-tinihanga. Mo te meka he utu, penei i te parirau o te Airbus AXNUMX / tau, kaore matou e kii i tetahi mea.

    Kia tupato, ka taea e te whakaahua te whara i nga tangata kei raro iho i te 30 tau

    Apache Rererangi: Maama ake te ETL

  • Tūmau Whakauru SQL Server - i whakamahia e matou tenei hoa i roto i a maatau rerenga kaupapa-roto. Ae ra: kua whakamahia kētia e matou a SQL Server, a he mea poauau te kore e whakamahi i ana taputapu ETL. He pai nga mea katoa: he ataahua nga atanga e rua, me nga korero o te ahunga whakamua ... Engari ehara tenei i te mea e pai ana matou ki nga hua rorohiko, aue, ehara mo tenei. Putanga reira dtsx (ko te XML me nga pona kua riwhi i te tiaki) ka taea, engari he aha te take? Me pehea te hanga i tetahi kete mahi ka toia nga rau ripanga mai i tetahi tūmau ki tetahi atu? Ae, he aha te rau, ka taka to maihao tohu mai i nga wahanga e rua tekau, ka paato i te paatene kiore. Engari he ahua ahua ake:

    Apache Rererangi: Maama ake te ETL

I rapua e matou he huarahi ki waho. Te take ahakoa tata i tae mai ki tetahi kaihanga putea SSIS kua tuhia e ia ake ...

…katahi ka kitea he mahi hou i ahau. Na ka mauhia ahau e Apache Airflow.

I taku kitenga ko nga whakaahuatanga tukanga ETL he waehere Python ngawari, kaore au i kanikani mo te koa. Koinei te ahua o te whakaputanga me te rereke o nga rerenga raraunga, me te ringihia nga teepu me te hanganga kotahi mai i nga rau o nga papaa raraunga ki te whaainga kotahi ka waiho hei take mo te waehere Python i roto i te kotahi me te hawhe, e rua ranei nga mata 13 ".

Huihui i te tautau

Kaua e whakarite i tetahi whare wananga katoa, kaua hoki e korero mo nga mea tino kitea i konei, penei i te whakauru i te Rererangi, to papaarangi kua tohua, Celery me etahi atu keehi e whakaahuatia ana i roto i nga tauranga.

Kia taea ai e tatou te timata tonu i nga whakamatautau, i tuhi ahau docker-compose.yml kei roto:

  • Kia piki ake airflow: Kaihōtaka, Tukutuku. Ka huri ano te Puawai ki reira ki te aro turuki i nga mahi herewi (na te mea kua panaia ki roto apache/airflow:1.10.10-python3.7, engari kaore matou e whakaaro)
  • PostgreSQL, ka tuhia e Airflow ana korero ratonga (raraunga kairangi, tatauranga mahi, me etahi atu), ka tohuhia e te Celery nga mahi kua oti;
  • Redis, ka mahi hei kaihokohoko mahi mo te Celery;
  • Kaimahi herewi, ka uru ki te mahi tika i nga mahi.
  • Ki te kōpaki ./dags ka taapirihia o maatau konae me te whakamaarama o dags. Ka kohia i runga i te rere, no reira kaore he take ki te huri i te puranga katoa i muri i ia tihe.

I etahi waahi, kaore i tino whakaatuhia te waehere i roto i nga tauira (kia kore ai e pakaru te tuhinga), engari i tetahi waahi ka whakarereketia i roto i te tukanga. Ka kitea nga tauira waehere mahi oti i roto i te putunga https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Tuhipoka:

  • I roto i te huihuinga o te hanganga, i whakawhirinaki ahau ki te ahua rongonui puckel/docker-airflow - kia mohio koe ki te tirotiro. Kaore pea koe e hiahia ki tetahi atu mea i roto i to oranga.
  • Ko nga tautuhinga Rererangi katoa e waatea ana ehara i te mea anake airflow.cfg, engari na roto i nga taurangi taiao (he mihi ki nga kaiwhakawhanake), i kino ahau ki te whakamahi.
  • Ko te tikanga, ehara i te mea kua rite: Kaore au i te tuku i nga ngakau ki runga i nga ipu, kaore au i raru ki te haumaru. Engari i mahia e ahau te iti rawa e tika ana mo o maatau whakamatautau.
  • Kia mahara ko:
    • Me uru te kōpaki dag ki te kaihōtaka me nga kaimahi.
    • Ka pa ano ki nga whare pukapuka tuatoru katoa - me whakauru katoa ki runga i nga miihini me te raarangi me nga kaimahi.

Inaianei he ngawari noa:

$ docker-compose up --scale worker=3

Ka ara ake nga mea katoa, ka taea e koe te titiro ki nga hononga tukutuku:

Ngā ariā taketake

Mena kaore koe i mohio ki tetahi mea i roto i enei "dags", koinei te papakupu poto:

  • Kaihoahoa - te matua keke nui i roto i te Airflow, e whakahaere ana i nga mahi a nga karetao, ehara i te tangata: ka aro turuki i te raarangi, ka whakahou i nga ra, ka whakarewahia nga mahi.

    I te nuinga o nga wa, i nga waahanga tawhito, he raruraru ki a ia mo te mahara (kao, ehara i te amnesia, engari he turuturu) a ka noho tonu te tawhā tuku iho ki nga whirihora. run_duration — tona wa whakaara ano. Inaianei kua pai nga mea katoa.

  • DAG (aka "dag") - "whakahaere kauwhata acyclic", engari ko te whakamaarama penei he iti noa nga taangata, engari he ipu mo nga mahi e mahi tahi ana (tirohia ki raro) he taapiri ranei mo te Package in SSIS me te Rerengamahi i Informatica .

    I tua atu i nga dags, tera pea he subdags, engari kaore pea e tae atu ki a raatau.

  • Rere DAG - te ra kua tohua, kua tohua ki a ia ake execution_date. Ka taea e Dagrans o taua ra te mahi whakarara (mehemea kua whakatauhia e koe o mahi, o te akoranga).
  • Tohutūmahi he waahanga waehere te kawenga mo te mahi i tetahi mahi motuhake. E toru nga momo kaiwhakahaere:
    • mahirite to tatou tino pai PythonOperator, ka taea te mahi i tetahi (mana) waehere Python;
    • whakawhiti, e kawe ana i nga raraunga mai i tetahi waahi ki tetahi waahi, penei, MsSqlToHiveTransfer;
    • pūoko i tetahi atu taha, ka taea e koe te urupare, te whakaroa ranei i te mahi o te ra kia puta ra ano tetahi huihuinga. HttpSensor ka taea te toia te pito mutunga kua tohua, a ka tatari ana te whakautu e hiahiatia ana, timata te whakawhiti GoogleCloudStorageToS3Operator. Ka patai te hinengaro rapu: “he aha? I muri i nga mea katoa, ka taea e koe te mahi tukurua tika i roto i te kaiwhakahaere! Na, kia kore ai e purua te puna o nga mahi me nga kaiwhakahaere kua whakatarewahia. Ka tīmata te pūoko, ka taki, ka mate i mua i te nganatanga e whai ake nei.
  • Tūmahi - Ko nga kaiwhakahaere kua whakahuahia, ahakoa he aha te momo, ka piri ki te ra ka whakatairangahia ki te taumata o te mahi.
  • tauira mahi - i te wa i whakatauhia e te kaiwhakatakoto mahere kua tae ki te wa ki te tuku mahi ki te whawhai ki nga kaihaka-kaimahi (i runga tonu i te waahi, mena ka whakamahia e matou LocalExecutor ranei ki te pona mamao i roto i te take o CeleryExecutor), ka tautapahia he horopaki ki a ratou (arā, he huinga taurangi - tawhā mahi), ka whakawhanuihia nga tauira tono, patai ranei, ka kohia.

Ka whakaputa mahi maatau

Tuatahi, me whakaatu i te kaupapa whanui o to tatou doug, katahi ka rukuhia nga korero mo nga korero, na te mea ka whakamahia e matou etahi otinga kore-iti.

Na, i roto i tona ahua ngawari, ka penei te ahua o te ra:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Kia mohio tatou:

  • Tuatahi, kawemai tatou i nga libs e tika ana me tetahi atu mea;
  • sql_server_ds Ko List[namedtuple[str, str]] me nga ingoa o nga hononga mai i Airflow Connections me nga papaunga raraunga ka tangohia e matou to maatau pereti;
  • dag - te panui o to tatou ra, me uru mai globals(), ki te kore e kore e kitea e Airflow. Me kii ano a Doug:
    • ko wai tona ingoa orders - ka puta tenei ingoa ki te atanga tukutuku,
    • ka mahi ia i waenganui po i te waru o Hurae,
    • me oma, tata ia 6 haora (mo nga tangata uaua i konei hei utu mo timedelta() whakaaetia cron-raina 0 0 0/6 ? * * *, mo te iti ake te hauhautanga - he korero penei @daily);
  • workflow() ka mahi i te mahi matua, engari kaua inaianei. Mo tenei wa, ka maka noa to tatou horopaki ki te raarangi.
  • Na inaianei ko te makutu ngawari o te hanga mahi:
    • rere tatou i roto i to tatou puna;
    • arawhiti PythonOperator, ka mahi i ta tatou pohehe workflow(). Kaua e wareware ki te tohu i tetahi ingoa ahurei (i roto i te ra) o te mahi me te here i te ra ano. Kara provide_context i roto i te tahuri, ka ringihia atu tohenga ki roto i te mahi, e āta kohikohi tatou te whakamahi **context.

Mo tenei wa, heoi ano. He aha ta matou i whiwhi:

  • Da hou i roto i te atanga tukutuku,
  • kotahi me te haurua rau nga mahi ka mahia i roto i te whakarara (mehemea ka whakaaetia e te Rererangi, nga tautuhinga Celery me te kaha o te tūmau).

Kaati, tata ka mau.

Apache Rererangi: Maama ake te ETL
Ma wai e whakauru nga whakawhirinakitanga?

Hei whakamaarama i tenei mea katoa, i wiri ahau docker-compose.yml tukatuka requirements.txt i runga i nga pona katoa.

Inaianei kua ngaro:

Apache Rererangi: Maama ake te ETL

Ko nga tapawha hina he waahi mahi i tukatukahia e te kaihōtaka.

Ka tatari tatou, ka mau nga mahi e nga kaimahi:

Apache Rererangi: Maama ake te ETL

Ko nga mea kaariki, ko te tikanga, kua tutuki pai a raatau mahi. Ko nga Whero kaore i tino angitu.

Ma te ara, karekau he kōpaki kei runga i ta maatau hua ./dags, karekau he tukutahitanga i waenga i nga miihini - kei roto katoa nga dags git i runga i ta maatau Gitlab, a ka tohatohahia e Gitlab CI nga whakahoutanga ki nga miihini i te wa e hanumi ana master.

He iti mo te Puawai

I te wa e patupatu ana nga kaimahi i a tatou pacifiers, kia maumahara tatou ki tetahi atu taputapu hei whakaatu mai i tetahi mea - Puawai.

Ko te wharangi tuatahi me nga korero whakarāpopototanga mō ngā kōpuku kaimahi:

Apache Rererangi: Maama ake te ETL

Ko te wharangi tino kaha me nga mahi i haere ki te mahi:

Apache Rererangi: Maama ake te ETL

Ko te wharangi tino hoha me te mana o to maatau kaihokohoko:

Apache Rererangi: Maama ake te ETL

Ko te wharangi kanapa me nga kauwhata mana mahi me te waa mahi:

Apache Rererangi: Maama ake te ETL

Ka utaina e matou nga mea kua kore e utaina

Na, kua mahi nga mahi katoa, ka taea e koe te kawe atu i nga taotu.

Apache Rererangi: Maama ake te ETL

A he maha nga taotu - mo tetahi take, mo tetahi atu. I roto i te take o te whakamahi tika o Airflow, ko enei tapawha e tohu ana kaore nga raraunga i tae mai.

Me maataki koe i te raarangi me te whakaara ano i nga waa mahi kua hinga.

Ma te panui i tetahi tapawha, ka kite tatou i nga mahi e waatea ana ki a maatau:

Apache Rererangi: Maama ake te ETL

Ka taea e koe te tango me te Whakakore i te hunga kua hinga. Arā, ka warewarehia kua rahua tetahi mea ki reira, ka haere ano te mahi tauira ki te kaitakataka.

Apache Rererangi: Maama ake te ETL

E marama ana ko te mahi i tenei me te kiore me nga tapawha whero katoa kaore i te tino tangata - ehara tenei i te mea e tumanakohia ana e Airflow. Ko te tikanga, kei a tatou nga patu patu tangata: Browse/Task Instances

Apache Rererangi: Maama ake te ETL

Me kowhiria nga mea katoa i te wa kotahi ka tautuhi ano ki te kore, pawhiria te mea tika:

Apache Rererangi: Maama ake te ETL

Whai muri i te horoi, he penei te ahua o a maatau taxi (kei te tatari kee te kaiwhakariterite ki te whakarite i a raatau):

Apache Rererangi: Maama ake te ETL

Hononga, matau me etahi atu taurangi

Kua tae ki te wa ki te titiro ki te DAG e whai ake nei, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Kua mahia e te katoa he whakahou purongo? Koia ano tenei: he rarangi o nga puna korero mai i hea ka whiwhi raraunga; he rarangi kei hea hei whakatakoto; kaua e wareware ki te whakatangi i te wa i pa ai nga mea katoa, i pakaru ranei (kaore tenei mo tatou, kaore).

Ka huri ano tatou i te konae ka titiro ki nga mea kerekere hou:

  • from commons.operators import TelegramBotSendMessage - kaore he mea e arai i a matou ki te hanga i a matou ake kaiwhakahaere, i whakamahia e matou ma te hanga takai iti mo te tuku karere ki te Wewete. (Ka korerohia e matou mo tenei kaiwhakahaere i raro nei);
  • default_args={} - ka taea e dag te toha i nga tohenga rite ki ona kaiwhakahaere katoa;
  • to='{{ var.value.all_the_kings_men }}' - mara to e kore matou e whai tohu pakeke, engari i hangaia ma te whakamahi i a Jinja me tetahi taurangi me te raarangi imeera, ka ata whakauruhia e au. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — tikanga mo te tiimata i te kaiwhakahaere. I roto i to maatau, ka rere te reta ki nga rangatira mena kua mahi nga whakawhirinaki katoa angitu;
  • tg_bot_conn_id='tg_main' - tohenga conn_id whakaae ki nga TT hononga ka hangaia e matou Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ka rere atu nga karere i roto i Telegram mena he mahi kua hinga;
  • task_concurrency=1 - ka aukatihia e matou te whakarewanga o nga wa maha o nga mahi kotahi. Ki te kore, ka whiwhi tatou i te whakarewanga tukutahi o etahi VerticaOperator (te titiro ki tetahi tepu);
  • report_update >> [email, tg] - nga mea katoa VerticaOperator whakakotahi ki te tuku reta me nga karere, penei:
    Apache Rererangi: Maama ake te ETL

    Engari i te mea he rereke nga tikanga whakarewanga o nga kaiwhakatakoto korero, kotahi anake ka mahi. I te Tirohanga Rakau, he iti ake te ahua o nga mea katoa:
    Apache Rererangi: Maama ake te ETL

Ka korero ahau i etahi kupu mo tonotono me o ratou hoa- taurangi.

Ko nga tonotono he kaiwhakatakoto waahi a Jinja ka taea te whakakapi i nga momo korero whaihua ki nga tohenga a te kaiwhakahaere. Hei tauira, penei:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ka whakawhänui atu ki nga ihirangi o te taurangi horopaki execution_date i roto i te whakatakotoranga YYYY-MM-DD: 2020-07-14. Ko te mea pai rawa atu ko nga taurangi horopaki ka whana ki tetahi tauira mahi motuhake (he tapawha i te Tirohanga Rakau), a ka timata ano, ka whakawhānuihia nga waahi ki nga uara ano.

Ka taea te tiro i nga uara kua tohua ma te whakamahi i te paatene Rendered ki ia tauira mahi. He penei te mahi me te tuku reta:

Apache Rererangi: Maama ake te ETL

Na i te mahi ki te tuku karere:

Apache Rererangi: Maama ake te ETL

Kei konei te rarangi katoa o nga tonotono whakauru mo te putanga hou e waatea ana: tohutoro tonotono

I tua atu, ma te awhina o nga monomai, ka taea e tatou te whakaatu i o tatou ake tonotono, engari he korero ano tera.

I tua atu i nga mea kua tautuhia, ka taea e taatau te whakakapi i nga uara o a maatau taurangi (kua whakamahia e au i te waehere i runga ake nei). Kia hanga tatou ki roto Admin/Variables e rua nga mea:

Apache Rererangi: Maama ake te ETL

Ko nga mea katoa ka taea e koe te whakamahi:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Ko te uara he scalar, he JSON hoki. Mena mo JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

whakamahia noa te ara ki te kī e hiahiatia ana: {{ var.json.bot_config.bot.token }}.

Ka korero ahau i tetahi kupu ka whakaatu i tetahi whakaahua mo hononga. He mea timatanga nga mea katoa i konei: kei te wharangi Admin/Connections ka hangaia he hononga, taapiri i a maatau takiuru / kupuhipa me etahi atu taapiri motuhake ki reira. Pēnei:

Apache Rererangi: Maama ake te ETL

Ka taea te whakamunatia nga kupuhipa (he nui ake i te taunoa), ka taea ranei e koe te whakarere i te momo hononga (penei i ahau mo tg_main) - Ko te meka ko te rarangi o nga momo he maataki i roto i nga tauira Airflow me te kore e taea te whakawhanui me te kore e uru ki nga waehere puna (mehemea karekau au i google i tetahi mea, tena koa whakatikahia ahau), engari kaore he mea e aukati i a maatau ki te whiwhi whiwhinga noa. ingoa.

Ka taea hoki e koe te hanga hononga maha me te ingoa kotahi: i tenei keehi, ko te tikanga BaseHook.get_connection(), e whiwhi hononga ki a tatou ma te ingoa, ka hoatu matapōkeretia mai i te maha o nga ingoa (he pai ake te hanga i a Round Robin, engari me waiho ma te hinengaro o nga kaihanga Airflow).

Ko nga Taurangi me nga Hononga he taputapu hauhautanga, engari he mea nui kia kaua e ngaro te toenga: ko nga waahanga o au rerenga ka penapenahia e koe i roto i te waehere ake, me nga waahanga ka hoatu e koe ki te Airflow hei rokiroki. I tetahi taha, he watea ki te whakarereke tere i te uara, hei tauira, he pouaka mēra, ma te UI. I tetahi atu taha, he hokinga tonu tenei ki te paato kiore, i hiahia matou (I) ki te whakakore atu.

Ko te mahi me nga hononga tetahi o nga mahi matau. I te nuinga o te waa, ko nga matau Airflow he tohu mo te hono atu ki nga ratonga tuatoru me nga whare pukapuka. Hei tauira, JiraHook ka whakatuwheratia he kiritaki mo tatou ki te taunekeneke ki a Jira (ka taea e koe te neke i nga mahi ki muri me muri), me te awhina o SambaHook ka taea e koe te pana i tetahi konae rohe ki smb-tohu.

Paring i te kaiwhakahaere ritenga

Na ka tata matou ki te titiro ki te ahua o te hanga TelegramBotSendMessage

Waehere commons/operators.py me te kaiwhakahaere pono:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

I konei, penei i nga mea katoa o Airflow, he tino ngawari nga mea katoa:

  • I tuku iho mai BaseOperator, e whakatinana ana i etahi mea motuhake Rererangi (tirohia to waatea)
  • Nga mara kua whakapuakina template_fields, ka titiro a Jinja mo nga tonotono hei tukatuka.
  • Kua whakaritea nga tohenga tika mo __init__(), tautuhia nga taunoa ki nga waahi e tika ana.
  • Kare hoki matou i wareware ki te timatanga o te tipuna.
  • I whakatuwherahia te matau e rite ana TelegramBotHooki whiwhi taonga kiritaki mai i a ia.
  • Aratuka whakakorea (tautuhi ano). BaseOperator.execute(), ka huri a Airfow ka tae mai te wa ki te whakarewa i te kaiwhakahaere - kei roto ka whakatinanahia e matou te mahi matua, ka wareware ki te takiuru. (Ka takiuru matou, na te ara, tika ki roto stdout и stderr - Ka haukoti te rere o te hau i nga mea katoa, ka takai ataahua, ka pirau i nga waahi e tika ana.)

Kia kite tatou he aha ta tatou commons/hooks.py. Ko te wahanga tuatahi o te konae, me te matau ano:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Kaore au i te mohio he aha te whakamarama i konei, ka tuhi noa ahau i nga mea nui:

  • Ka whakawhiwhia tatou, whakaarohia nga tautohetohe - i te nuinga o te waa ka kotahi: conn_id;
  • Ko nga tikanga paerewa: I whakawhäiti ahau i ahau get_conn(), ka whiwhi ahau i nga tawhā hononga ma te ingoa me te tiki noa i te waahanga extra (he mara JSON tenei), i tuhia e au (e ai ki aku tohutohu!) te tohu karetao Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ka hanga e ahau he tauira o to maatau TelegramBot, ka hoatu he tohu motuhake.

Heoi ano. Ka taea e koe te tiki kaihoko mai i te matau ma te whakamahi TelegramBotHook().clent ranei TelegramBotHook().get_conn().

A ko te waahanga tuarua o te konae, ka mahia e au he miihini miihini mo te Telegram REST API, kia kore ai e toia ano. python-telegram-bot mo tetahi tikanga sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ko te huarahi tika ko te taapiri katoa: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i roto i te monomai, hoatu ki roto i te putunga a te iwi, ka hoatu ki te Open Source.

I a matou e ako ana i enei mea katoa, i tutuki pai a matou whakahounga purongo me te tuku karere hapa ki ahau i te hongere. Ka tirohia e au mena kei te he...

Apache Rererangi: Maama ake te ETL
I pakaru tetahi mea i roto i to maatau kuri! He teka ianei ko ta matou i tumanako ai? Tika!

Ka ringihia e koe?

Kei te whakaaro koe i ngaro ahau i tetahi mea? Te ahua nei i oati ia ki te whakawhiti raraunga mai i te SQL Server ki a Vertica, katahi ka tangohia e ia, ka neke atu i te kaupapa, ko te ware!

Ko tenei mahi nanakia he mea whakaaro noa, me whakamaarama noa e au etahi kupu mo koe. Inaianei ka taea e koe te haere atu.

Ko ta matou mahere ko tenei:

  1. Mahi ra
  2. Hangaia nga mahi
  3. Tirohia te ataahua o nga mea katoa
  4. Whakaritea nga tau wahanga hei whakaki
  5. Tikina nga raraunga mai i te SQL Server
  6. Whakauruhia nga raraunga ki Vertica
  7. Kohikohia nga tatauranga

Na, ki te whakatika i enei mea katoa, i hanga e ahau he taapiri iti ki to maatau docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

I reira ka whakaarahia e matou:

  • Ko Vertica hei kaihautu dwh me te nuinga o nga tautuhinga taunoa,
  • e toru nga waahanga o te SQL Server,
  • Ka whakakiia e matou nga papaaarangi i nga waahanga o muri me etahi raraunga (kaore rawa e tirohia mssql_init.py!)

Ka whakarewahia e matou nga mea pai katoa ma te awhina o tetahi whakahau uaua ake i te waa whakamutunga:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ko nga mea i hangaia e ta maatau mahi ohorere, ka taea e koe te whakamahi i te taonga Data Profiling/Ad Hoc Query:

Apache Rererangi: Maama ake te ETL
Ko te mea nui kia kaua e whakaatu ki nga kaitätari

whakamaarama Nga waahanga e pa ana ki a ETL Kare au, he iti noa nga mea katoa i reira: ka hangaia e matou he turanga, he tohu kei roto, ka takai i nga mea katoa ki te kaiwhakahaere horopaki, a inaianei ka mahia e matou:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Kua tae mai te wa kohikohia o maatau raraunga mai i a matou tepu kotahi me te hawhe rau. Me mahi tenei ma te awhina o nga raina tino koretake:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ma te awhina o te matau ka riro mai i te Airflow pymssql-hono
  2. Me whakakapi he here i roto i te ahua o te ra ki te tono - ka maka ki roto i te mahi e te miihini tauira.
  3. Te whangai i ta matou tono pandasma wai tatou e tiki DataFrame - ka whai hua ki a tatou a meake nei.

Kei te whakamahi ahau i te whakakapi {dt} hei utu mo te tawhā tono %s ehara i te mea he Pinocchio kino ahau, engari na te mea pandas e kore e taea te hapai pymssql a ka paheke te whakamutunga params: Listahakoa e tino hiahia ana ia tuple.
Kia mahara ano ko te kaiwhakawhanake pymssql ka whakatau kia kaua e tautoko i a ia, kua tae ki te wa ki te wehe pyodbc.

Kia kite tatou he aha te Airflow i whakakii i nga tautohetohe o a maatau mahi:

Apache Rererangi: Maama ake te ETL

Mena kaore he raraunga, karekau he take ki te haere tonu. Engari he mea ke ki te whakaaro kua angitu te whakakii. Engari ehara tenei i te he. A-ah-ah, me aha?! Na konei te aha:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ka korero ki a Airflow kaore he hapa, engari ka pekehia e matou te mahi. Ko te atanga kaore he tapawha matomato, whero ranei, engari he mawhero.

Kia makahia a maatau raraunga tīwae maha:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Koinei:

  • Te pātengi raraunga i tangohia e mātou ngā ota,
  • ID o ta maatau huihuinga waipuke (ka rereke mo ia mahi),
  • He hash mai i te puna me te ota ID - kia i roto i te pātengi raraunga whakamutunga (kei reira ka ringihia nga mea katoa ki te tepu kotahi) kei a matou he ID ota ahurei.

Ko te taahiraa whakamutunga ka mau tonu: ringihia nga mea katoa ki Vertica. A, ko te mea whakamiharo, ko tetahi o nga huarahi tino whakamiharo me te pai ki te mahi i tenei ko te CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Kei te hanga e matou he kaiwhiwhi motuhake StringIO.
  2. pandas Ka pai te tuku i a maatau DataFrame Tuhinga o mua CSV-raina.
  3. Me whakatuwhera he hononga ki ta tatou Vertica tino pai ki te matau.
  4. Na inaianei me te awhina copy() tukuna to maatau raraunga ki Vertika!

Ka tangohia e matou mai i te taraiwa e hia nga rarangi kua whakakiia, ka korero ki te kaiwhakahaere o te huihuinga he pai nga mea katoa:

session.loaded_rows = cursor.rowcount
session.successful = True

Heoi ano.

I runga i te hokonga, ka hangaia e matou te pereti whaainga ma te ringa. I konei ka whakaaetia e ahau he miihini iti:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Kei te whakamahi ahau VerticaOperator() Ka waihangahia e au he mahere papaa raraunga me tetahi ripanga (mehemea karekau ano, ko te tikanga). Ko te mea nui ko te whakarite tika i nga whakawhirinaki:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Kohia ana

- Ae, - ka kii te kiore iti, - ehara, inaianei
Kei te tino mohio koe ko ahau te kararehe tino kino o te ngahere?

Julia Donaldson, Te Gruffalo

Ki taku whakaaro mena he whakataetae au me aku hoa mahi: ko wai ka tere te hanga me te whakarewa i tetahi mahi ETL mai i te wahanga: ko ratou me a ratou SSIS me te kiore me ahau me Airflow ... Na ka whakatauritea ano e matou te ngawari o te tiaki ... Aue, ki taku whakaaro ka whakaae koe ka whiua e au ki nga taha katoa!

Mena he iti ake te whakaaro, katahi a Apache Airflow - ma te whakaahua i nga tukanga i roto i te ahua o te waehere papatono - i mahi taku mahi nui he pai ake, he ngahau hoki.

Ko tana toronga mutunga kore, e rua mo te mono-mai me te aro nui ki te tauineine, ka whai waahi koe ki te whakamahi Airflow i nga waahi katoa: ahakoa i roto i te huringa katoa o te kohi, te whakarite me te tukatuka raraunga, tae noa ki te whakarewatanga o nga toka (ki Mars, o akoranga).

He wahanga whakamutunga, he tohutoro me nga korero

Ko te rake kua kohia e matou mo koe

  • start_date. Ae, he meme o te rohe tenei. Ma te tohenga matua a Doug start_date haere katoa. He poto, mena ka tohua e koe ki roto start_date rā o nāianei, me schedule_interval - i tetahi ra, ka timata a DAG apopo kaore i mua.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A kore ake he raruraru.

    He hapa wa whakahaere e hono ana ki tera: Task is missing the start_date parameter, e tohu ana kua wareware koe ki te here ki te kaiwhakahaere dag.

  • Katoa i runga i te miihini kotahi. Ae, me nga turanga (Airflow ake me to tatou paninga), me te tūmau tukutuku, me te kaihōtaka, me nga kaimahi. A i whai hua ano. Engari i te roanga o te wa, ka piki ake te maha o nga mahi mo nga ratonga, a, i te wa i timata a PostgreSQL ki te whakautu ki te taurangi i roto i te 20 s hei utu mo te 5 ms, ka tangohia e matou, ka haria atu.
  • LocalExecutor. Ae, kei te noho tonu tatou i runga, kua tae noa mai ki te pito o te rire. Kua ranea a LocalExecutor mo matou i tenei wa, engari kua tae ki te wa ki te whakawhānui me tetahi kaimahi, me whakapau kaha ki te neke ki CeleryExecutor. A, i te mea ka taea e koe te mahi ki runga i tetahi miihini, kaore he mea e aukati i a koe ki te whakamahi i te Celery ahakoa i runga i te kaimau, "ko te tikanga, kaore e uru ki te mahi, pono!"
  • Whakamahi kore taputapu hanga-i roto:
    • hononga ki te penapena i nga tohu ratonga,
    • Ka ngaro a SLA ki te whakautu ki nga mahi kaore i tutuki i te waa,
    • xcom mo te whakawhiti metadata (i kii ahau metararaunga!) i waenga i nga mahi dag.
  • Tukino Mēra. Kaati, he aha taku korero? I whakaritea nga matohi mo nga tukuruatanga katoa o nga mahi kua hinga. Inaianei kei aku mahi Gmail he >90k nga imeera mai i Airflow, karekau te ngutu mēra tukutuku ki te tango me te muku neke atu i te 100 i te wa kotahi.

Ētahi atu mahanga: Nga Rawe Rererangi a Apache

He taputapu aunoa

Kia kaha ake ai te mahi ma o tatou mahunga, kaua ma o tatou ringaringa, na Airflow i whakarite ma tatou tenei:

  • API REST - kei a ia tonu te mana o te Whakamatau, e kore e aukati i a ia ki te mahi. Ma tenei, kaore e taea e koe anake te tiki korero mo nga ra me nga mahi, engari ka mutu / tiimata he ra, hanga he DAG Run, he poka wai ranei.
  • CLI - he maha nga taputapu e waatea ana ma te raina whakahau ehara i te mea ngawari ki te whakamahi ma te WebUI, engari kei te ngaro noa. Hei tauira:
    • backfill e hiahiatia ana ki te whakaara ano i nga tauira mahi.
      Hei tauira, ka haere mai nga kaitirotiro ka kii: "A ko koe, e hoa, he poauau nga korero mai i te Hanuere 1 ki te 13! Whakatika, whakatika, whakatika, whakatika!" A he hob koe:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Ratonga turanga: initdb, resetdb, upgradedb, checkdb.
    • run, e taea ai e koe te whakahaere i tetahi mahi tauira, me te piro i runga i nga whakawhirinakitanga katoa. I tua atu, ka taea e koe te whakahaere ma te LocalExecutor, ahakoa he kahui Herewi koe.
    • He rite tonu te mahi test, anake ano hoki i roto i nga turanga kaore e tuhia.
    • connections ka taea te hanga papatipu hononga mai i te anga.
  • API Python - he huarahi tino uaua mo te taunekeneke, he mea tika mo nga monomai, kaua e pupuhi ki roto me nga ringaringa iti. Engari ko wai hei aukati i a maatau ki te haere /home/airflow/dags, rere ipython a ka timata ki te pohehe? Ka taea e koe, hei tauira, te kaweake i nga hononga katoa me te waehere e whai ake nei:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Te hono atu ki te papaunga raraunga Rererangi. Kaore au e pai ki te tuhi ki a ia, engari ko te whiwhi i nga ahuatanga mahi mo nga momo inenga motuhake ka tere ake, ka ngawari ake i te whakamahi i tetahi o nga API.

    Me kii tatou ehara i te mea he ngoikore nga mahi katoa, engari ka taka pea i etahi wa, he mea noa tenei. Engari he ruarua nga aukati kua whakapae kee, a he mea tika kia tirohia.

    Kia tupato SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

tohutoro

Ae ra, ko nga hononga tuatahi tekau mai i te tukunga a Google ko nga mea kei roto i te kōpaki Airflow mai i aku tohu tohu.

Me nga hononga i whakamahia i roto i te tuhinga:

Source: will.com

Hokona te manaaki pono mo nga waahi me te tiaki DDoS, nga kaiwhakarato VPS VDS 🔥 Hokona he manaaki paetukutuku pono me te tiakitanga DDoS, ngā tūmau VPS VDS | ProHoster