Apache Airflow: E maʻalahi iā ETL

Aloha ʻoe, ʻo wau ʻo Dmitry Logvinenko - Data Engineer o ke Keʻena Analytics o ka hui Vezet o nā hui.

E haʻi aku wau iā ʻoe e pili ana i kahi mea hana kupanaha no ka hoʻomohala ʻana i nā kaʻina hana ETL - Apache Airflow. Akā ʻo ka Airflow he mea maʻalahi a multifaceted pono ʻoe e nānā pono iā ia inā ʻaʻole ʻoe i komo i nā kahe ʻikepili, akā pono e hoʻomaka i nā kaʻina hana a nānā i kā lākou hoʻokō.

A ʻae, ʻaʻole wau e haʻi wale aku, akā hōʻike pū kekahi: he nui nā code, nā screenshots a me nā ʻōlelo aʻoaʻo o ka papahana.

Apache Airflow: E maʻalahi iā ETL
ʻO ka mea maʻamau āu e ʻike ai ke google ʻoe i ka huaʻōlelo Airflow / Wikimedia Commons

Ka papa o nā mea

Hōʻike

ʻO Apache Airflow e like me Django:

  • kākau ʻia ma ka python
  • aia kahi papa admin nui,
  • hiki ke hoʻonui mau ʻia

- ʻoi aku ka maikaʻi, a ua hana ʻia no nā kumu like ʻole, ʻo ia hoʻi (e like me ka mea i kākau ʻia ma mua o ka kat):

  • ka holo ʻana a me ka nānā ʻana i nā hana ma ka helu palena ʻole o nā mīkini (e like me ka nui o Celery / Kubernetes a me kou lunamanaʻo e ʻae iā ʻoe)
  • me ka hoʻoulu ʻana o ka workflow mai ka maʻalahi loa e kākau a hoʻomaopopo i ka code Python
  • a me ka hiki ke hoʻohui i nā ʻikepili a me nā API me kekahi me ka hoʻohana ʻana i nā ʻāpana i mākaukau a me nā plugins i hana ʻia e ka home (he maʻalahi loa ia).

Hoʻohana mākou i ka Apache Airflow e like me kēia:

  • hōʻiliʻili mākou i ka ʻikepili mai nā kumu like ʻole (nui nā SQL Server a me PostgreSQL manawa, nā API like ʻole me nā metric noi, ʻo 1C) ma DWH a me ODS (loaʻa iā mākou ʻo Vertica a me Clickhouse).
  • pehea ka holomua cron, e hoʻomaka ana i nā kaʻina hoʻohui ʻikepili ma ka ODS, a nānā pū i kā lākou mālama.

A hiki i kēia manawa, ua uhi ʻia kā mākou mau pono e kahi kikowaena liʻiliʻi me 32 cores a me 50 GB o RAM. Ma Airflow, hana kēia:

  • более 200 mau lā (ʻoiaʻiʻo nā kahe hana, kahi mākou i hoʻopiha ai i nā hana),
  • i kēlā me kēia ma ka awelika 70 mau hana,
  • hoʻomaka kēia maikaʻi (ma ka awelika hoʻi) hookahi hora.

A pehea mākou i hoʻonui ai, e kākau wau ma lalo, akā i kēia manawa e wehewehe mākou i ka pilikia über a mākou e hoʻoponopono ai:

ʻEkolu kumu SQL Servers, kēlā me kēia me 50 ʻikepili - nā manawa o hoʻokahi papahana, kēlā me kēia, loaʻa iā lākou ke ʻano like (kokoke ma nā wahi āpau, mua-ha-ha), ʻo ia hoʻi he papa kauoha ko kēlā me kēia. Hiki ke hoʻokomo ʻia ka inoa i kekahi ʻoihana). Lawe mākou i ka ʻikepili ma ka hoʻohui ʻana i nā kahua lawelawe (kumu kumu, waihona kumu, ETL task ID) a hoʻolei naive iā lākou i loko, e ʻōlelo, Vertica.

E hele kākou!

ʻO ka ʻāpana nui, hana (a me kahi ʻōlelo aʻoaʻo)

No ke aha mākou (a ʻo ʻoe)

I ka nui o nā kumulāʻau a ua maʻalahi wau SQL-schik i hoʻokahi hale kūʻai Lūkini, ua hoʻopunipuni mākou i nā kaʻina ETL aka kahe ʻikepili me ka hoʻohana ʻana i ʻelua mau mea hana i loaʻa iā mākou:

  • Kikowaena Mana Informatica - he ʻōnaehana hoʻolaha nui loa, hoʻohua nui loa, me kāna lako ponoʻī, kāna hoʻololi ponoʻī. Ua hoʻohana au i ke Akua e pāpā i ka 1% o kona hiki. No ke aha mai? ʻAe, ʻo ka mea mua, ʻo kēia interface, ma kahi o nā makahiki 380, ua hoʻokaumaha ka noʻonoʻo iā mākou. ʻO ka lua, ua hoʻolālā ʻia kēia contraption no nā kaʻina hana maikaʻi loa, ka hoʻohana hou ʻana i ka ʻāpana huhū a me nā hoʻopunipuni ʻoihana nui loa. E pili ana i ke kumukūʻai, e like me ka ʻēheu o ka Airbus AXNUMX / makahiki, ʻaʻole mākou e ʻōlelo i kekahi mea.

    E makaʻala, hiki i ka screenshot ke hōʻeha iki i ka poʻe ma lalo o 30

    Apache Airflow: E maʻalahi iā ETL

  • SQL Server Integration Server - ua hoʻohana mākou i kēia hoa i kā mākou kahe intra-project. ʻAe, ʻoiaʻiʻo: ua hoʻohana mākou i ka SQL Server, a he mea kūpono ʻole ke hoʻohana ʻole i kāna mau mea hana ETL. Maikaʻi nā mea a pau: nani nā mea ʻelua, a hōʻike ka holomua ... ʻAʻole kēia ke kumu makemake mākou i nā huahana polokalamu, ʻaʻole no kēia. Version it dtsx (ʻo ia ka XML me nā nodes i hoʻopaʻa ʻia i ka mālama ʻana) hiki iā mākou, akā he aha ke kumu? Pehea e pili ana i ka hana ʻana i kahi pūʻolo hana e kauo ai i nā haneli haneli mai kahi kikowaena i kekahi? ʻAe, he aha ka haneli, e hāʻule ana kou manamana lima mai nā ʻāpana he iwakālua, e kaomi ana i ke pihi ʻiole. Akā ʻoi aku ka maikaʻi o ka nānā ʻana:

    Apache Airflow: E maʻalahi iā ETL

Ua ʻimi maoli mākou i nā ala e puka ai. Ka hihia like kokoke hele mai i kahi mea hoʻoheheʻe pūʻolo SSIS kākau ponoʻī ...

…a laila loaʻa iaʻu kahi hana hou. A ua loaʻa iaʻu ʻo Apache Airflow.

I koʻu ʻike ʻana he code Python maʻalahi nā wehewehe kaʻina ETL, ʻaʻole wau i hula no ka hauʻoli. ʻO kēia ke ʻano o ka hoʻololi ʻana a me ka ʻokoʻa ʻana o nā kahawai ʻikepili, a ʻo ka ninini ʻana i nā papa me kahi hoʻolālā hoʻokahi mai nā haneli o nā waihona i hoʻokahi pahuhopu i lilo i mea no ka Python code i hoʻokahi a me ka hapa a i ʻole ʻelua mau pale 13 ".

ʻO ka hui ʻana i ka pūʻulu

Mai hoʻonohonoho mākou i kahi kula kindergarten, a mai kamaʻilio e pili ana i nā mea maopopo loa ma aneʻi, e like me ka hoʻokomo ʻana i ka Airflow, kāu waihona i koho ʻia, Celery a me nā hihia ʻē aʻe i wehewehe ʻia i loko o nā pahu.

I hiki iā mākou ke hoʻomaka koke i nā hoʻokolohua, ua kahakaha au docker-compose.yml ma kahi:

  • E hoʻokiʻekiʻe maoli kākou Airflow: Mea hoʻonohonoho, Webserver. E wili pū ana ka pua ma laila e nānā i nā hana Celery (no ka mea, ua hoʻokomo ʻia i loko apache/airflow:1.10.10-python3.7, akā ʻaʻole mākou manaʻo)
  • PostgreSQL, kahi e kākau ai ʻo Airflow i kāna ʻike lawelawe (nā ʻikepili hoʻonohonoho, nā helu hoʻokō, etc.), a e hōʻailona ʻo Celery i nā hana i hoʻopau ʻia;
  • Ho'ōla, ka mea e hana ma ke ʻano he mea hoʻolimalima hana no Celery;
  • Mea hana Seleri, e komo i ka hoʻokō pololei i nā hana.
  • I ka waihona ./dags e hoʻohui mākou i kā mākou mau faila me ka wehewehe ʻana o dags. E ʻohi ʻia lākou ma ka lele, no laila ʻaʻohe pono e juggle i ka waihona holoʻokoʻa ma hope o kēlā me kēia kihe.

I kekahi mau wahi, ʻaʻole i hōʻike piha ʻia ke code i loko o nā hiʻohiʻona (i ʻole e hoʻopili i ka kikokikona), akā ma kahi i hoʻololi ʻia i ke kaʻina hana. Hiki ke loaʻa nā hiʻohiʻona code hana piha ma ka waihona https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Nā memo:

  • I ka hui o ka haku mele, ua hilinaʻi nui au i ke kiʻi kaulana puckel/docker-airflow - e nānā pono. Malia paha ʻaʻole pono ʻoe i kekahi mea ʻē aʻe i kou ola.
  • Loaʻa nā hoʻonohonoho Airflow āpau ʻaʻole wale ma o airflow.cfg, akā ma o nā ʻano hoʻololi kaiapuni (mahalo i nā mea hoʻomohala), aʻu i hoʻohana hewa ai.
  • ʻO ka mea maʻamau, ʻaʻole ia i mākaukau: ʻaʻole wau i kau i ka puʻuwai puʻuwai ma nā ipu, ʻaʻole wau i pilikia i ka palekana. Akā, ua hana au i ka liʻiliʻi kūpono no kā mākou mau mea hoʻokolohua.
  • E hoʻomaopopo:
    • Pono ka waihona dag e hiki i ka mea hoʻonohonoho a me nā limahana.
    • Hoʻohana like ia i nā hale waihona puke ʻaoʻao ʻekolu - pono lākou e hoʻokomo ʻia ma nā mīkini me kahi mea hoʻonohonoho a me nā limahana.

ʻAe, i kēia manawa ua maʻalahi:

$ docker-compose up --scale worker=3

Ma hope o ka piʻi ʻana o nā mea a pau, hiki iā ʻoe ke nānā i nā kikowaena pūnaewele:

Nā manaʻo kumu

Inā ʻaʻole ʻoe i maopopo i kekahi mea ma kēia mau "dags", a laila eia kahi puke wehewehe pōkole:

  • Hoʻomākaukau - ʻo ka makua kāne koʻikoʻi ma Airflow, e hoʻomalu ana i ka hana ʻana o nā robots, ʻaʻole ke kanaka: nānā i ka papa kuhikuhi, hoʻonui i nā dags, hoʻomaka i nā hana.

    Ma keʻano laulā, i nā mana kahiko, loaʻa iā ia nā pilikia me ka hoʻomanaʻo (ʻaʻole, ʻaʻole amnesia, akā leaks) a ua mau ka ʻāpana hoʻoilina i nā configs. run_duration — kona manawa hoʻomaka hou. Akā i kēia manawa ua maikaʻi nā mea a pau.

  • DAG (aka "dag") - "directed acyclic graph", akā ʻo ia ʻano wehewehe e haʻi i nā poʻe liʻiliʻi, akā ʻoiaʻiʻo he pahu ia no nā hana e launa pū kekahi me kekahi (e ʻike i lalo) a i ʻole kahi analogue o Package in SSIS a me Workflow in Informatica .

    Ma waho aʻe o nā dags, aia paha nā subdags, akā ʻaʻole paha mākou e kiʻi iā lākou.

  • Holo ka DAG - ka lā i hoʻomaka ʻia, i hāʻawi ʻia iā ia iho execution_date. Hiki i nā Dagrans o ka lā hoʻokahi ke hana like (inā ua hana ʻoe i kāu mau hana i idempotent, ʻoiaʻiʻo).
  • Aʻole he mau ʻāpana code no ka hana ʻana i kahi hana kikoʻī. ʻEkolu ʻano o nā mea hana:
    • hanae like me kā mākou punahele PythonOperator, hiki ke hoʻokō i kekahi code Python (pono);
    • hoʻoili, ka mea lawe i ka ʻikepili mai kahi a i kahi, e ʻōlelo, MsSqlToHiveTransfer;
    • mīkiniʻimi ma ka ʻaoʻao ʻē aʻe, e ʻae iā ʻoe e pane a hoʻolohi i ka hoʻokō hou ʻana o ka dag a hiki i kahi hanana. HttpSensor hiki ke huki i ka hopena i kuhikuhi ʻia, a ke kali ka pane i makemake ʻia, e hoʻomaka i ka hoʻoili GoogleCloudStorageToS3Operator. E nīnau ka noʻonoʻo nīnau: “No ke aha? Ma hope o nā mea a pau, hiki iā ʻoe ke hana hou i ka mea hoʻohana! A laila, i ʻole e hoʻopaʻa i ka wai o nā hana me nā mea hana i hoʻokuʻu ʻia. Hoʻomaka ka mea ʻike, nānā a make ma mua o ka hoʻāʻo aʻe.
  • Pākuʻi - hoʻolaha ʻia nā mea hoʻohana, me ka nānā ʻole i ke ʻano, a pili i ka dag e hāpai ʻia i ke kūlana o ka hana.
  • laʻana hana - i ka manawa i hoʻoholo ai ka mea hoʻolālā maʻamau ua hiki i ka manawa e hoʻouna i nā hana i ke kaua ma luna o nā mea hana-hana (pololei ma kahi, inā mākou e hoʻohana LocalExecutor a i ʻole i kahi node mamao i ka hihia o CeleryExecutor), hāʻawi ia i kahi pōʻaiapili iā lākou (ʻo ia hoʻi, kahi hoʻonohonoho o nā mea hoʻololi - nā ʻāpana hoʻokō), hoʻonui i nā ʻōkuhi kauoha a i ʻole nīnau, a hoʻokomo iā lākou.

Hana mākou i nā hana

ʻO ka mea mua, e wehewehe mākou i ka hoʻolālā maʻamau o kā mākou doug, a laila e luʻu mākou i nā kikoʻī hou aʻe, no ka mea ke hoʻohana nei mākou i kekahi mau hoʻonā non-trivial.

No laila, ma kāna ʻano maʻalahi, e like ke ʻano o kēia dag:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

E noʻonoʻo kākou:

  • ʻO ka mea mua, lawe mākou i nā libs pono a mea e ae;
  • sql_server_ds Ua List[namedtuple[str, str]] me nā inoa o nā pilina mai Airflow Connections a me nā ʻikepili kahi e lawe ai mākou i kā mākou pā;
  • dag - ka hoʻolaha o kā mākou lā, pono e komo i loko globals(), inā ʻaʻole e loaʻa ka Airflow. Pono ʻo Doug e ʻōlelo:
    • Owai kona inoa orders - a laila e ʻike ʻia kēia inoa ma ka ʻaoʻao pūnaewele,
    • e hana ana oia mai ke aumoe i ka la ewalu o Iulai.
    • a pono e holo, ma kahi o kēlā me kēia 6 hola (no ka poʻe paʻakikī ma ʻaneʻi ma kahi o timedelta() ʻae ʻia cron-laina 0 0 0/6 ? * * *, no ka liʻiliʻi liʻiliʻi - he ʻōlelo like @daily);
  • workflow() e hana i ka hana nui, akā ʻaʻole i kēia manawa. I kēia manawa, e hoʻolei mākou i kā mākou pōʻaiapili i loko o ka log.
  • A i kēia manawa ka mea kilokilo maʻalahi o ka hana ʻana i nā hana:
    • holo mākou ma ko mākou mau kumu;
    • hoʻomaka PythonOperator, ka mea e hoʻokō i kā mākou dummy workflow(). Mai poina e kuhikuhi i kahi inoa kūikawā (i loko o ka dag) o ka hana a hoʻopaʻa i ka dag ponoʻī. Hae provide_context ma ka huli ʻana, e ninini i nā hoʻopaʻapaʻa hou aʻe i ka hana, a mākou e hōʻiliʻili pono ai me ka hoʻohana ʻana **context.

I kēia manawa, ʻo ia wale nō. Nā mea i loaʻa iā mākou:

  • ka lā hou ma ka ʻaoʻao pūnaewele,
  • hoʻokahi a me ka hapa haneli mau hana e hana like ʻia (inā ʻae ka Airflow, Celery a me ka mana kikowaena).

ʻAe, kokoke loaʻa.

Apache Airflow: E maʻalahi iā ETL
Na wai e hoʻokomo i nā hilinaʻi?

No ka hoʻomaʻamaʻa ʻana i kēia mea āpau, ua hoʻololi wau docker-compose.yml hana ʻana requirements.txt ma na node a pau.

I kēia manawa ua hala:

Apache Airflow: E maʻalahi iā ETL

ʻO nā ʻāpana ʻāhinahina he mau hana i hana ʻia e ka mea hoʻonohonoho.

Ke kali nei mākou, ua hoʻopau ʻia nā hana e nā limahana:

Apache Airflow: E maʻalahi iā ETL

ʻO nā'ōmaʻomaʻo,ʻoiaʻiʻo, ua hoʻopau maikaʻi lākou i kā lākou hana. ʻAʻole lanakila nui nā ʻulaʻula.

Ma ke ala, ʻaʻohe waihona ma kā mākou huahana ./dags, ʻaʻohe hoʻonohonoho ma waena o nā mīkini - aia nā dags a pau git ma kā mākou Gitlab, a hāʻawi ʻo Gitlab CI i nā mea hou i nā mīkini i ka wā e hui pū ai master.

He wahi liʻiliʻi e pili ana i ka Pua

ʻOiai e ʻā ana nā limahana i kā mākou pacifiers, e hoʻomanaʻo kākou i kekahi mea hana e hiki ke hōʻike iā mākou i kekahi mea - Pua.

ʻO ka ʻaoʻao mua loa me ka ʻike hōʻuluʻulu o nā nodes limahana:

Apache Airflow: E maʻalahi iā ETL

ʻO ka ʻaoʻao ikaika loa me nā hana i hele i ka hana:

Apache Airflow: E maʻalahi iā ETL

ʻO ka ʻaoʻao ʻoluʻolu loa me ke kūlana o kā mākou broker:

Apache Airflow: E maʻalahi iā ETL

ʻO ka ʻaoʻao māmā loa me nā kiʻi kūlana hana a me ko lākou manawa hoʻokō:

Apache Airflow: E maʻalahi iā ETL

Hoʻouka mākou i ka underloaded

No laila, ua holo nā hana a pau, hiki iā ʻoe ke lawe i nā mea ʻeha.

Apache Airflow: E maʻalahi iā ETL

A ua nui ka poe i eha - no kekahi kumu. I ka hihia o ka hoʻohana pono ʻana o Airflow, hōʻike kēia mau ʻāpana ʻaʻole i hiki mai ka ʻikepili.

Pono ʻoe e nānā i ka log a hoʻomaka hou i nā manawa hana hāʻule.

Ma ke kaomi ʻana i kēlā me kēia square, e ʻike mākou i nā hana i loaʻa iā mākou:

Apache Airflow: E maʻalahi iā ETL

Hiki iā ʻoe ke lawe a hana i Clear the fallen. ʻO ia hoʻi, poina mākou ua hāʻule kekahi mea ma laila, a e hele ka hana like i ka mea hoʻonohonoho.

Apache Airflow: E maʻalahi iā ETL

ʻIke ʻia ʻo ka hana ʻana i kēia me ka ʻiole me nā ʻāpana ʻulaʻula āpau ʻaʻole ia he kanaka - ʻaʻole kēia ka mea a mākou e manaʻo ai mai Airflow. Ma keʻano maoli, loaʻa iā mākou nā mea kaua o ka luku nui: Browse/Task Instances

Apache Airflow: E maʻalahi iā ETL

E koho mākou i nā mea a pau i ka manawa hoʻokahi a hoʻihoʻi i ka zero, kaomi i ka mea kūpono:

Apache Airflow: E maʻalahi iā ETL

Ma hope o ka hoʻomaʻemaʻe ʻana, e like me kēia kā mākou kaʻa (ke kali nei lākou i ka mea hoʻonohonoho e hoʻonohonoho iā lākou):

Apache Airflow: E maʻalahi iā ETL

Hoʻohui, makau a me nā mea hoʻololi ʻē aʻe

ʻO ka manawa kēia e nānā ai i ka DAG e hiki mai ana, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ua hana nā kānaka a pau i ka hōʻike hōʻike? Eia hou ʻo ia: aia kahi papa inoa o nā kumu mai kahi e loaʻa ai ka ʻikepili; aia kahi papa inoa e waiho ai; Mai poina i ka honk i ka wā i hana ʻia ai nā mea a pau a haki paha (maikaʻi, ʻaʻole pili kēia iā mākou, ʻaʻole).

E hele hou kāua i ka faila a nānā i nā mea pohihihi hou:

  • from commons.operators import TelegramBotSendMessage - ʻaʻohe mea e pale iā mākou mai ka hana ʻana i kā mākou mau mea hoʻohana ponoʻī, a mākou i hoʻohana pono ai ma ka hana ʻana i kahi wīwī liʻiliʻi no ka hoʻouna ʻana i nā leka iā Unblocked. (E kamaʻilio hou mākou e pili ana i kēia mea hoʻohana ma lalo);
  • default_args={} - hiki i ka dag ke kaʻana like i nā manaʻo hoʻopaʻapaʻa i nā mea hoʻohana a pau;
  • to='{{ var.value.all_the_kings_men }}' - kahua to ʻAʻole mākou e paʻakikī, akā hana ikaika ʻia me ka hoʻohana ʻana iā Jinja a me kahi ʻano me ka papa inoa o nā leka uila, aʻu i hoʻokomo pono ai i loko. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kūlana no ka hoʻomaka ʻana i ka mea hoʻohana. I kā mākou hihia, e lele ka leka i nā luna wale nō inā ua hana nā mea hilinaʻi āpau holomua;
  • tg_bot_conn_id='tg_main' - hoʻopaʻapaʻa conn_id e ʻae i nā ID pili a mākou i hana ai Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - e lele wale nā ​​memo ma Telegram inā loaʻa nā hana hāʻule;
  • task_concurrency=1 - pāpā mākou i ka hoʻomaka like ʻana o kekahi mau hana hana o hoʻokahi hana. A i ʻole, e loaʻa iā mākou ka hoʻomaka like ʻana o kekahi VerticaOperator (e nana ana i kekahi papaaina);
  • report_update >> [email, tg] - nā mea āpau VerticaOperator hui i ka hoʻouna ʻana i nā leka a me nā memo, e like me kēia:
    Apache Airflow: E maʻalahi iā ETL

    Akā no ka mea he ʻokoʻa nā kūlana hoʻolaha o nā mea hoʻolaha, hoʻokahi wale nō e hana. I ka Tree View, ʻike iki nā mea a pau:
    Apache Airflow: E maʻalahi iā ETL

E ʻōlelo wau i kekahi mau ʻōlelo e pili ana macros a me ko lakou mau hoa- nā mea hoʻololi.

ʻO nā Macros nā wahi paʻa Jinja hiki ke hoʻololi i nā ʻike like ʻole i nā manaʻo hoʻopaʻapaʻa. No ka laʻana, e like me kēia:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} e hoʻonui i nā mea i loko o ka hoʻololi pōʻaiapili execution_date i ka ʻano YYYY-MM-DD: 2020-07-14. ʻO ka mea maikaʻi loa, ua hoʻopaʻa ʻia nā ʻano hoʻololi ʻokoʻa i kahi laʻana hana kikoʻī (kahi huinahā ma ka Tree View), a i ka wā e hoʻomaka hou ai, e hoʻonui ʻia nā mea waiho i nā waiwai like.

Hiki ke nānā ʻia nā waiwai i hāʻawi ʻia me ka pihi Rendered ma kēlā me kēia hana hana. Penei ka hana me ka hoʻouna leka:

Apache Airflow: E maʻalahi iā ETL

A pēlā i ka hana me ka hoʻouna ʻana i kahi leka:

Apache Airflow: E maʻalahi iā ETL

Aia kahi papa inoa piha o nā macros i kūkulu ʻia no ka mana hou loa i loaʻa ma aneʻi: kuhikuhi macros

Eia kekahi, me ke kōkua o nā plugins, hiki iā mākou ke haʻi i kā mākou macros ponoʻī, akā ʻo ia kekahi moʻolelo.

Ma kahi o nā mea i koho mua ʻia, hiki iā mākou ke hoʻololi i nā waiwai o kā mākou mau ʻano (ua hoʻohana wau i kēia ma ke code ma luna). E hana kākou i loko Admin/Variables ʻelua mau mea:

Apache Airflow: E maʻalahi iā ETL

ʻO nā mea a pau āu e hoʻohana ai:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Hiki i ka waiwai ke scalar, a i ʻole JSON. I ka hihia o JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

e hoʻohana wale i ke ala i ke kī makemake: {{ var.json.bot_config.bot.token }}.

E ʻōlelo maoli wau i hoʻokahi huaʻōlelo a hōʻike i hoʻokahi kiʻi kiʻi e pili ana nā pilina. He mea haʻahaʻa nā mea a pau ma ʻaneʻi: ma ka ʻaoʻao Admin/Connections hana mākou i kahi pilina, hoʻohui i kā mākou logins / password a me nā ʻāpana kikoʻī ma laila. E like me kēia:

Apache Airflow: E maʻalahi iā ETL

Hiki ke hoʻopili ʻia nā ʻōlelo huna (ʻoi aku ka maikaʻi ma mua o ka paʻamau), a i ʻole hiki iā ʻoe ke haʻalele i ke ʻano pili (e like me kaʻu i hana ai no tg_main) - ʻo ka ʻoiaʻiʻo, ua paʻa ka papa inoa o nā ʻano i nā hiʻohiʻona Airflow a ʻaʻole hiki ke hoʻonui ʻia me ka ʻole o ka komo ʻana i nā code kumu (inā ʻaʻole au i google i kekahi mea, e ʻoluʻolu e hoʻoponopono iaʻu), akā ʻaʻohe mea e pale iā mākou mai ka loaʻa ʻana o nā hōʻaiʻē ma o inoa.

Hiki iā ʻoe ke hana i kekahi mau pilina me ka inoa like: i kēia hihia, ke ʻano BaseHook.get_connection(), e loaʻa iā mākou nā pilina ma ka inoa, e hāʻawi kūʻokoʻa mai kekahi mau inoa (ʻoi aku ka maikaʻi o ka hana ʻana i Round Robin, akā e waiho mākou ma ka lunamanaʻo o nā mea hoʻomohala Airflow).

ʻO nā ʻokoʻa a me nā pilina he mau mea hana maikaʻi loa, akā he mea nui ʻaʻole e nalowale i ke koena: ʻo nā ʻāpana o kāu kahe āu e mālama ai i ke code ponoʻī, a ʻo nā ʻāpana āu e hāʻawi ai iā Airflow no ka mālama ʻana. Ma kekahiʻaoʻao, hiki ke maʻalahi ke hoʻololi koke i ka waiwai, no ka laʻana, kahi pahu leka uila, ma o ka UI. Ma ka ʻaoʻao ʻē aʻe, he hoʻi kēia i ka kaomi ʻiole, kahi a mākou (I) i makemake ai e hoʻopau.

ʻO ka hana me nā pilina kekahi o nā hana makau. Ma keʻano laulā, he mau maka no ka hoʻopili ʻana iā ia i nā lawelawe ʻaoʻao ʻekolu a me nā hale waihona puke. E laʻa, JiraHook e wehe i kahi mea kūʻai no mākou e launa pū me Jira (hiki iā ʻoe ke neʻe i nā hana i hope a i waho), a me ke kōkua o SambaHook hiki iā ʻoe ke pana i kahi faila kūloko i smb- kiko.

Hoʻopili i ka mea hoʻohana maʻamau

A kokoke mākou e nānā i ka hana ʻana TelegramBotSendMessage

kuhi commons/operators.py me ka mea hoʻohana maoli:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Eia, e like me nā mea ʻē aʻe ma Airflow, maʻalahi loa nā mea āpau:

  • Hoʻoilina mai BaseOperator, e hoʻokō ana i kekahi mau mea kikoʻī Airflow (e nānā i kāu leʻaleʻa)
  • Nā kahua i haʻi ʻia template_fields, kahi e ʻimi ai ʻo Jinja i nā macros e hana.
  • Hoʻonohonoho i nā hoʻopaʻapaʻa kūpono no __init__(), hoʻonoho i nā mea paʻamau inā pono.
  • ʻAʻole mākou i poina i ka hoʻomaka ʻana o ke kupuna.
  • Wehe ʻia ka makau pili TelegramBotHookloaʻa i kahi mea kūʻai mai ia mea.
  • ʻO ke ʻano hoʻololi (hoʻololi hou ʻia). BaseOperator.execute(), ʻo ia ka Airfow e hoʻokuʻu i ka wā e hoʻomaka ai ka mea hoʻohana - i loko o laila mākou e hoʻokō ai i ka hana nui, poina e komo. (Loaʻa mākou, ma ke ala, komo pololei stdout и stderr - E hoʻokuʻu ka ea i nā mea a pau, kāʻei nani ia, hoʻoheheʻe ʻia inā pono.)

E ʻike kākou i nā mea i loaʻa iā kākou commons/hooks.py. ʻO ka hapa mua o ka faila, me ka makau ponoʻī:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ʻAʻole maopopo iaʻu i ka mea e wehewehe ai ma aneʻi, e nānā wale wau i nā mea nui:

  • Hoʻoilina mākou, e noʻonoʻo e pili ana i nā hoʻopaʻapaʻa - ma ka hapanui o nā hihia he hoʻokahi: conn_id;
  • Ka wehewehe hou ʻana i nā ʻano maʻamau: Ua kaupalena wau iaʻu iho get_conn(), kahi e loaʻa ai iaʻu nā palena pili ma ka inoa a loaʻa wale i ka ʻāpana extra (he kahua JSON kēia), kahi aʻu (e like me kaʻu mau ʻōlelo aʻoaʻo!) kau i ka Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Hoʻokumu wau i kahi laʻana o kā mākou TelegramBot, hāʻawi i kahi hōʻailona kikoʻī.

ʻo ia wale nō. Hiki iā ʻoe ke kiʻi i kahi mea kūʻai mai kahi hook hoʻohana TelegramBotHook().clent ai ole ia, TelegramBotHook().get_conn().

A ʻo ka ʻāpana ʻelua o ka faila, kahi aʻu e hana ai i kahi microwrapper no ka Telegram REST API, i ʻole e kauo like. python-telegram-bot no ka mea hookahi sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

ʻO ke ala kūpono e hoʻohui i nā mea a pau: TelegramBotSendMessage, TelegramBotHook, TelegramBot - i ka plugin, e hoʻokomo i loko o kahi waihona lehulehu, a hāʻawi iā Open Source.

ʻOiai mākou e aʻo nei i kēia mau mea a pau, ua lanakila kā mākou hōʻike hōʻike a hoʻouna mai iaʻu i kahi leka hewa ma ke kahawai. E nānā au e ʻike inā he hewa ...

Apache Airflow: E maʻalahi iā ETL
Ua haki kekahi mea i kā mākou ʻīlio! ʻAʻole anei ʻo ia kā mākou i manaʻo ai? Pololei!

E ninini anei ʻoe?

Manaʻo paha ʻoe ua nele au i kekahi mea? Me he mea lā ua hoʻohiki ʻo ia e hoʻoili i ka ʻikepili mai ka SQL Server iā Vertica, a laila lawe ʻo ia a neʻe aku i ke kumuhana, ka scoundrel!

Ua manaʻo ʻia kēia hana ʻino, pono wau e wehewehe i kekahi mau huaʻōlelo no ʻoe. I kēia manawa hiki iā ʻoe ke hele i mua.

ʻO kā mākou papahana kēia:

  1. Hana lā
  2. Hana i nā hana
  3. E ʻike i ka nani o nā mea a pau
  4. Hāʻawi i nā helu kau e hoʻopiha
  5. E kiʻi i ka ʻikepili mai SQL Server
  6. E hoʻokomo i ka ʻikepili i loko o Vertica
  7. E hōʻiliʻili i nā helu

No laila, no ka hoʻokō ʻana i kēia mau mea a pau, ua hana wau i kahi hoʻohui liʻiliʻi i kā mākou docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ma laila mākou e hāpai ai:

  • ʻO Vertica ka mea hoʻokipa dwh me nā hoʻonohonoho paʻamau loa,
  • ʻekolu mau manawa o SQL Server,
  • hoʻopiha mākou i nā ʻikepili i ka hope me kekahi mau ʻikepili (ʻaʻohe hihia e nānā i loko mssql_init.py!)

Hoʻomaka mākou i nā mea maikaʻi a pau me ke kōkua o kahi kauoha ʻoi aku ka paʻakikī ma mua o ka manawa hope:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ʻO ka mea i hana ʻia e kā mākou hana mana randomizer, hiki iā ʻoe ke hoʻohana i ka mea Data Profiling/Ad Hoc Query:

Apache Airflow: E maʻalahi iā ETL
ʻO ka mea nui ʻaʻole ia e hōʻike i nā mea loiloi

wehewehe ma Nā kau ETL ʻAʻole wau, he mea liʻiliʻi loa nā mea a pau: hana mākou i kahi kumu, aia kahi hōʻailona i loko, kāʻei mākou i nā mea āpau me kahi luna pōʻaiapili, a ke hana nei mākou i kēia:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ua hiki mai ka manawa e hōʻiliʻili i kā mākou ʻikepili mai ko makou papaaina hookahi a me ka hapa. E hana mākou i kēia me ke kōkua o nā laina maikaʻi ʻole:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Me ke kōkua o kahi makau e loaʻa iā mākou mai Airflow pymssql-pili
  2. E hoʻololi i kahi palena ma ke ʻano o ka lā i loko o ka noi - e hoʻolei ʻia i loko o ka hana e ka mīkini template.
  3. E hānai ana i kā mākou noi pandasnawai e loaa mai DataFrame - he mea pono ia iā mākou i ka wā e hiki mai ana.

Ke hoʻohana nei au i ka hoʻololi {dt} ma kahi o ka palena noi %s ʻaʻole no ka mea he Pinocchio ʻino wau, akā no ka mea pandas hiki ole ke lawelawe pymssql a pahee i ka hope params: Listʻoiai makemake maoli ʻo ia tuple.
E hoʻomaopopo hoʻi i ka mea hoʻomohala pymssql ua hoʻoholo e kākoʻo hou iā ia, a ua hiki i ka manawa e neʻe ai pyodbc.

E ʻike kākou i ka mea a Airflow i hoʻopiha ai i nā hoʻopaʻapaʻa o kā mākou hana me:

Apache Airflow: E maʻalahi iā ETL

Inā ʻaʻohe ʻikepili, ʻaʻohe kumu o ka hoʻomau. Akā, he mea kupanaha nō hoʻi ke noʻonoʻo i ka holomua o ka hoʻopiha ʻana. Akā, ʻaʻole kēia he kuhi. A-ah-ah, he aha ka hana?! A eia ka mea:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException e haʻi iā Airflow ʻaʻohe hewa, akā hoʻokuʻu mākou i ka hana. ʻAʻole e loaʻa i ka interface kahi ʻōmaʻomaʻo a ʻulaʻula paha, akā ʻulaʻula.

E kiola i kā mākou ʻikepili mau kolamu:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Kaulana:

  • ʻO ka ʻikepili kahi i lawe ai mākou i nā kauoha,
  • ID o kā mākou kau waikahe (e ʻokoʻa no kēlā me kēia hana),
  • ʻO kahi hash mai ke kumu a me ka ID kauoha - no laila i loko o ka waihona hope loa (kahi i ninini ʻia nā mea āpau i hoʻokahi papa) loaʻa iā mākou kahi ID kauoha kūʻokoʻa.

Ke koe nei ka pae hope loa: e ninini i nā mea a pau i Vertica. A ʻo ka mea kupanaha, ʻo kekahi o nā ala maikaʻi loa e hana ai i kēia ma o CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ke hana nei mākou i kahi mea hoʻokipa kūikawā StringIO.
  2. pandas e hoʻokomo maikaʻi i kā mākou DataFrame i ke ano CSV-laina.
  3. E wehe kākou i kahi pilina me kā Vertica punahele me ka makau.
  4. A i kēia manawa me ke kōkua copy() e hoʻouna pololei i kā mākou ʻikepili iā Vertika!

E kiʻi mākou mai ka mea hoʻokele i ka nui o nā laina i hoʻopiha ʻia, a haʻi i ka luna hālāwai ua maikaʻi nā mea a pau:

session.loaded_rows = cursor.rowcount
session.successful = True

O ia wale nō.

Ma ke kūʻai aku, hana mākou me ka lima i ka pā paʻa. Ma ʻaneʻi ua ʻae wau iaʻu iho i kahi mīkini liʻiliʻi:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ke hoʻohana nei au VerticaOperator() Hoʻokumu wau i kahi papa kuhikuhi waihona a me kahi papaʻaina (inā ʻaʻole lākou i noho mua, ʻoiaʻiʻo). ʻO ka mea nui e hoʻonohonoho pono i nā hilinaʻi:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Loaʻa i luna

- ʻAe, - wahi a ka ʻiole liʻiliʻi, - ʻaʻole anei, i kēia manawa
Manaʻo ʻoe ʻo wau ka holoholona weliweli loa o ka nahele?

ʻO Julia Donaldson, The Gruffalo

Manaʻo wau inā he hoʻokūkū koʻu mau hoa hana: ʻo wai ka mea e hana wikiwiki a hoʻomaka i kahi kaʻina hana ETL mai ka wā ʻōpio: ʻo lākou me kā lākou SSIS a me ka ʻiole a me aʻu me Airflow ... A laila e hoʻohālikelike mākou i ka maʻalahi o ka mālama ʻana ... Auē, manaʻo wau e ʻae ʻoe e lanakila wau iā lākou ma nā ʻaoʻao āpau!

Inā ʻoi aku ka koʻikoʻi, a laila ʻo Apache Airflow - ma ka wehewehe ʻana i nā kaʻina hana ma ke ʻano o ka code program - i hana i kaʻu hana nui ʻoi aku ka ʻoluʻolu a me ka leʻaleʻa.

ʻO kona hiki ʻole palena ʻole, ma nā ʻōlelo o nā plug-ins a me ka predisposition i ka scalability, hāʻawi iā ʻoe i ka manawa e hoʻohana ai i ka Airflow ma kahi kokoke i nā wahi āpau: ʻoiai i ka piha holoʻokoʻa o ka hōʻiliʻili, hoʻomākaukau a me ka hoʻoili ʻana i ka ʻikepili, ʻoiai i ka hoʻokuʻu ʻana i nā rockets (i Mars, o papa).

Mahele hope, kuhikuhi a me ka ʻike

ʻO ka rake a mākou i hōʻiliʻili ai no ʻoe

  • start_date. ʻAe, he meme kūloko kēia. Via ka hoʻopaʻapaʻa nui a Doug start_date hala nā mea a pau. I ka pōkole, inā ʻoe e kuhikuhi i loko start_date lā o kēia manawa, a schedule_interval - i kekahi lā, a laila hoʻomaka ʻo DAG i ka lā ʻapōpō ʻaʻole ma mua.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A ʻaʻohe pilikia hou.

    Aia kekahi hewa runtime e pili ana me ia: Task is missing the start_date parameter, e hōʻike pinepine ana ua poina ʻoe e hoʻopaʻa i ka mea hoʻohana dag.

  • ʻO nā mea a pau ma kahi mīkini. ʻAe, a me nā kumu (Airflow pono'ī a me kā mākou uhi), a me kahi kikowaena pūnaewele, a me ka mea hoʻonohonoho, a me nā limahana. A ua hana pono. Akā i ka wā lōʻihi, ua ulu ka nui o nā hana no nā lawelawe, a i ka wā i hoʻomaka ai ʻo PostgreSQL e pane i ka index ma 20 s ma kahi o 5 ms, lawe mākou a lawe aku.
  • Mea hoʻokō kūloko. ʻAe, ke noho nei mākou i luna, a ua hiki mua mākou i ka lihi o ka hohonu. Ua lawa ka LocalExecutor iā mākou i kēia manawa, akā i kēia manawa ua hiki i ka manawa e hoʻonui me hoʻokahi mea hana, a pono mākou e hana ikaika e neʻe i CeleryExecutor. A no ka mea hiki iā ʻoe ke hana pū me ia ma ka mīkini hoʻokahi, ʻaʻohe mea e kāohi iā ʻoe mai ka hoʻohana ʻana i ka Celery ma kahi kikowaena, "ʻoiaʻiʻo, ʻaʻole loa e hele i ka hana, ʻoiaʻiʻo!"
  • Hoʻohana ʻole mea paahana i kukuluia:
    • i pili ai e mālama i nā hōʻoia lawelawe,
    • SLA Nalo e pane i nā hana i hana ʻole i ka manawa,
    • xcom no ka hoʻololi metadata (ʻōlelo wau metaʻikepili!) ma waena o nā hana dag.
  • Hoʻokino leka. ʻAe, he aha kaʻu e ʻōlelo ai? Ua hoʻonohonoho ʻia nā māka no ka hana hou ʻana o nā hana hāʻule. I kēia manawa, ʻo kaʻu hana Gmail he> 90k leka uila mai Airflow, a hōʻole ka pahu leka uila e ʻohi a holoi i nā mea ʻoi aku ma mua o 100 i ka manawa.

Nā pilikia hou aʻe: ʻO Apache Airflow Pitfas

ʻOi aku nā mea hana automation

I mea e hana hou ai mākou me ko mākou mau poʻo ʻaʻole me ko mākou mau lima, ua hoʻomākaukau ʻo Airflow no mākou i kēia:

  • i koe API - loaʻa iā ia ke kūlana o Experimental, ʻaʻole ia e pale iā ia mai ka hana. Me ia, ʻaʻole hiki iā ʻoe ke kiʻi wale i ka ʻike e pili ana i nā dags a me nā hana, akā hoʻopau / hoʻomaka i kahi dag, hana i kahi DAG Run a i ʻole kahi wai.
  • CLI - nui nā mea hana i loaʻa ma o ka laina kauoha ʻaʻole hiki ke hoʻohana ma o ka WebUI, akā ʻaʻole maʻamau. ʻo kahi laʻana:
    • backfill pono e hoʻomaka hou i nā manawa hana.
      No ka laʻana, hele mai nā mea loiloi a ʻōlelo mai: "A ʻo ʻoe, e ka hoa, he mea lapuwale i ka ʻikepili mai Ianuali 1 a 13! Hoʻoponopono, hoʻoponopono, hoʻoponopono, hoʻoponopono!" A he hob oe:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Hana kumu: initdb, resetdb, upgradedb, checkdb.
    • run, hiki iā ʻoe ke holo i hoʻokahi hana hana, a me ka helu ʻana i nā mea hilinaʻi āpau. Eia kekahi, hiki iā ʻoe ke holo ma o LocalExecutor, ʻoiai inā loaʻa iā ʻoe kahi hui Celery.
    • Hana like i ka mea like test, ma na kumu wale no ka mea kakau ole.
    • connections hiki ke hana nui i nā pilina mai ka pūpū.
  • API Python - kahi ala paʻakikī o ke kamaʻilio ʻana, i manaʻo ʻia no nā plugins, ʻaʻole e hoʻopā i loko me nā lima liʻiliʻi. Akā ʻo wai ka mea e kāohi iā mākou mai ka hele ʻana /home/airflow/dags, holo ipython a hoʻomaka i ka hana ʻino? Hiki iā ʻoe, no ka laʻana, e hoʻokuʻu aku i nā pili āpau me kēia code:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Hoʻopili i ka Airflow metadatabase. ʻAʻole wau e paipai e kākau iā ia, akā ʻoi aku ka wikiwiki a me ka maʻalahi o ka loaʻa ʻana o nā mokuʻāina hana no nā metric kikoʻī like ʻole ma mua o ka hoʻohana ʻana i kekahi o nā API.

    E ʻōlelo kākou ʻaʻole i manaʻo nā hana a pau, akā hiki ke hāʻule i kekahi manawa, a he mea maʻamau kēia. Akā ke kānalua nei kekahi mau blockages, a pono e nānā.

    E akahele SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

kūmole

A ʻo ka ʻoiaʻiʻo, ʻo nā loulou mua he ʻumi mai ka hoʻopuka ʻana o Google nā ​​mea i loko o ka waihona Airflow mai kaʻu mau bookmark.

A me nā loulou i hoʻohana ʻia ma ka ʻatikala:

Source: www.habr.com

E kūʻai i ka hoʻokipa hilinaʻi no nā pūnaewele me ka pale DDoS, nā kikowaena VPS VDS 🔥 E kūʻai i ka hoʻokipa pūnaewele hilinaʻi me ka pale DDoS, nā kikowaena VPS VDS | ProHoster