Apache Airflow: Na-eme ka ETL dị mfe

Ndewo, a bụ m Dmitry Logvinenko - Injinia data nke ngalaba nyocha nke otu ụlọ ọrụ Vezet.

M ga-agwa gị banyere ngwá ọrụ magburu onwe ya maka ịmepụta usoro ETL - Apache Airflow. Mana Airflow dị ọtụtụ na ọtụtụ akụkụ nke na ị kwesịrị ileru ya anya nke ọma ọbụlagodi na ị naghị etinye aka na ntinye data, mana enwere mkpa ịmalite usoro ọ bụla kwa oge ma nyochaa mmezu ha.

Ma ee, agaghị m agwa naanị, kamakwa gosi: mmemme ahụ nwere ọtụtụ koodu, nseta ihuenyo na ndụmọdụ.

Apache Airflow: Na-eme ka ETL dị mfe
Ihe ị na-ahụkarị mgbe ị na-google okwu Airflow / Wikimedia Commons

Isiokwu dị n’ime

Okwu Mmalite

Apache Airflow dị ka Django:

  • edere ya na Python
  • e nwere nnukwu admin panel,
  • gbasaa ruo mgbe ebighị ebi

- naanị mma, na e mere ya maka kpamkpam dị iche iche nzube, ya bụ (dị ka e dere n'ihu kat):

  • na-agba ọsọ na nlekota oru na igwe na-akparaghị ókè (dị ka ọtụtụ Celery / Kubernetes na akọnuche gị ga-ekwe gị)
  • yana ọgbọ na-arụ ọrụ ike site na dị mfe ide na ịghọta koodu Python
  • na ikike ijikọ ọdụ data ọ bụla na API n'otu n'otu na-eji ma ngwa ngwa emebere na ngwa mgbakwunye ụlọ (nke dị oke mfe).

Anyị na-eji Apache Airflow dị ka nke a:

  • anyị na-anakọta data sitere na isi mmalite dị iche iche (ọtụtụ SQL Server na PostgreSQL, API dị iche iche nwere metrik ngwa, ọbụlagodi 1C) na DWH na ODS (anyị nwere Vertica na Clickhouse).
  • otú ọganihu cron, nke na-amalite usoro nchịkọta data na ODS, ma na-enyochakwa nhazi ha.

Ruo n'oge na-adịbeghị anya, otu obere ihe nkesa nwere cores 32 na 50 GB nke RAM kpuchie mkpa anyị. Na Airflow, nke a na-arụ ọrụ:

  • ọzọ 200 dag (n'ezie ọrụ na-arụ ọrụ, nke anyị na-arụ ọrụ),
  • na nke ọ bụla na nkezi Ọrụ 70,
  • ịdị mma a na-amalite (nakwa na nkezi) otu ugboro n'otu awa.

Na banyere otú anyị si gbasaa, m ga-ede n'okpuru ebe a, ma ugbu a, ka anyị kọwaa über-nsogbu na anyị ga-edozi:

Enwere Sava SQL atọ mbụ, nke ọ bụla nwere ọdụ data 50 - ihe atụ nke otu ọrụ, n'otu n'otu, ha nwere otu usoro (ihe fọrọ nke nta ka ọ bụrụ ebe niile, mua-ha-ha), nke pụtara na onye ọ bụla nwere tebụl iwu (ọ dabara nke ọma, tebụl nwere nke ahụ. aha nwere ike ịbanye n'ime azụmahịa ọ bụla). Anyị na-ewere data ahụ site n'ịgbakwụnye mpaghara ọrụ (ihe nkesa isi, ebe nchekwa data, ID ọrụ ETL) wee tụba ha n'amaghị ama, sịnụ, Vertica.

Ka anyị gawa!

Isi akụkụ, bara uru (na ntakịrị usoro iwu)

Gịnị kpatara anyị (na gị)

Mgbe osisi ndị buru ibu na m dị mfe SQL-schik n'otu ụlọ ahịa Russia, anyị scammed ETL usoro aka data na-aga site na iji ngwaọrụ abụọ dị anyị:

  • Informatica Power Center - usoro na-agbasa nke ukwuu, na-arụpụta nke ukwuu, yana ngwaike nke ya, ụdị nke ya. Eji m Chukwu machie 1% nke ikike ya. Gịnị kpatara? Ọfọn, nke mbụ, interface a, ebe site na 380s, na-etinye nrụgide n'uche anyị. Nke abuo, emebere ihe mgbochi a maka usoro mara mma nke ukwuu, iweghachi akụrụngwa na-ewe iwe na aghụghọ ndị ọzọ dị oke mkpa-ụlọ ọrụ. Banyere eziokwu na ọ na-efu, dị ka nku nke Airbus AXNUMX / afọ, anyị agaghị ekwu ihe ọ bụla.

    Kpachara anya, nseta ihuenyo nwere ike imerụ ndị na-erubeghị afọ 30 ahụ ntakịrị

    Apache Airflow: Na-eme ka ETL dị mfe

  • SQL Server Integration Server - anyị na-eji nke a comrade na anyị intra-project eruba. Ọ dị mma, n'ezie: anyị ejirila SQL Server, ọ ga-abụkwa ihe ezi uche na-adịghị na ya ịghara iji ngwaọrụ ETL ya. Ihe niile dị n'ime ya dị mma: ma interface ahụ mara mma, na akụkọ ọganihu ... Ma nke a abụghị ihe mere anyị ji hụ ngwaahịa software n'anya, oh, ọ bụghị maka nke a. Ụdị ya dtsx (nke bụ XML nwere ọnụ ọnụ na nchekwa) anyị nwere ike, mana gịnị bụ isi ihe? Kedu maka ịme ngwugwu ọrụ nke ga-adọkpụrụ ọtụtụ narị tebụl site n'otu ihe nkesa gaa na nke ọzọ? Ee, kedu otu narị, mkpịsị aka gị ga-adapụ site na iri abụọ, na-pịa bọtịnụ òké. Mana ọ na-ele anya karịa ejiji:

    Apache Airflow: Na-eme ka ETL dị mfe

N'ezie, anyị chọrọ ụzọ mgbapụ. Ikpe ọbụlagodi ihe fọrọ nke nta bịarutere ngwungwu SSIS dere onwe ya ...

… na mgbe ahụ, ọrụ ọhụrụ chọtara m. Na Apache Airflow rutere m na ya.

Mgbe m chọpụtara na nkọwa usoro ETL bụ koodu Python dị mfe, agbaghị m egwu maka ọṅụ. Nke a bụ otú e si sụgharịa ma kewaa iyi data dị iche iche, na ịwụsa tebụl nwere otu nhazi site na narị narị ọdụ data n'ime otu ebumnuche ghọrọ okwu Python code na otu na ọkara ma ọ bụ abụọ 13 enyo.

Ịchịkọta ụyọkọ ahụ

Ka anyị ghara ịhazi ụlọ akwụkwọ ọta akara kpamkpam, ma ghara ikwu maka ihe doro anya kpamkpam ebe a, dị ka ịwụnye Airflow, nchekwa data ị họọrọ, Celery na okwu ndị ọzọ akọwara na docks.

Ka anyị wee nwee ike ịmalite nnwale ozugbo, m chepụtara docker-compose.yml n'ime ya:

  • Ka anyị bulie n'ezie Ofufe ikuku: Onye nhazi oge, sava weebụ. Ifuru ga na-atụgharị ebe ahụ iji nyochaa ọrụ Celery (n'ihi na etinyelarị ya apache/airflow:1.10.10-python3.7, mana anyị achọghị ịma)
  • PostgreSQL, nke Airflow ga-edepụta ozi ọrụ ya (data onye nhazi, ọnụ ọgụgụ ogbugbu, wdg), na Celery ga-edepụta ọrụ ndị emechara;
  • Redis, nke ga-arụ ọrụ dị ka onye na-arụ ọrụ maka Celery;
  • Onye ọrụ Celery, nke a ga-etinye aka na arụ ọrụ ozugbo.
  • Na nchekwa ./dags anyị ga-agbakwunye faịlụ anyị na nkọwa nke dags. A ga-ebulite ha na ijiji, n'ihi ya, ọ dịghị mkpa ịkwanye ngwongwo niile mgbe uzere ọ bụla gasịrị.

N'ebe ụfọdụ, koodu dị na ihe atụ anaghị egosicha kpamkpam (ka ọ ghara imebi ederede), ma ebe a na-agbanwe ya na usoro ahụ. Enwere ike ịhụ ọmụmaatụ koodu ọrụ zuru oke na ebe nchekwa https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Ihe ndetu:

  • Na mgbakọ nke ihe mejupụtara, m na-adabere na ihe oyiyi ahụ a ma ama puckel/docker-ikuku - jide n'aka na ị ga-elele ya. Ma eleghị anya, ị chọghị ihe ọ bụla ọzọ na ndụ gị.
  • Ntọala ikuku ikuku niile dị ọ bụghị naanị site na airflow.cfg, kamakwa site na mgbanwe gburugburu ebe obibi (n'ihi ndị mmepe), nke m jiri obi ọjọọ mee ihe.
  • Dị ka o kwesịrị ịdị, ọ bụghị mmepụta-njikere: M kpachaara anya etinyeghị m nkụda mmụọ na arịa, echeghị m na nchekwa. Ma m mere opekempe adabara anyị experimenters.
  • Rụba nke ahụ ama:
    • Mpempe akwụkwọ dag ga-enwerịrị ma onye nhazi oge yana ndị ọrụ.
    • Otu ihe ahụ metụtara ọba akwụkwọ ndị ọzọ - a ga-etinyerịrị ha niile na igwe nwere onye nhazi na ndị ọrụ.

Ọfọn, ugbu a ọ dị mfe:

$ docker-compose up --scale worker=3

Mgbe ihe niile bilitere, ị nwere ike ile anya na ntanetị weebụ:

Echiche ndị bụ isi

Ọ bụrụ na ị ghọtaghị ihe ọ bụla na "dags" ndị a niile, yabụ ebe a bụ obere ọkọwa okwu:

  • Onye nhazi - nwanne nna kacha mkpa na Airflow, onye na-achịkwa na robots na-arụsi ọrụ ike, ọ bụghị mmadụ: na-enyocha usoro ihe omume, na-emelite dags, na-ebupụta ọrụ.

    N'ozuzu, na nsụgharị ochie, o nwere nsogbu na ebe nchekwa (ee, ọ bụghị amnesia, ma leaks) na ihe nketa nketa ọbụna nọgidere na nhazi. run_duration - ya Malitegharịa ekwentị etiti oge. Ma ugbu a ihe niile dị mma.

  • DAG (aka "dag") - "graphed acyclic graph", ma nkọwa dị otú ahụ ga-agwa mmadụ ole na ole, ma n'ezie ọ bụ akpa maka ọrụ na-emekọrịta ihe na ibe ya (lee n'okpuru ebe a) ma ọ bụ ihe analog nke ngwugwu na SSIS na Workflow na Informatica. .

    Na mgbakwunye na dags, a ka nwere ike ịnwe subdags, mana anyị agaghị enweta ha.

  • DAG Gbaa ọsọ - mmalite dag, nke e kenyere nke ya execution_date. Dagrans nke otu dag nwere ike ịrụ ọrụ n'otu oge (ọ bụrụ na ịmeela ọrụ gị ike, n'ezie).
  • Onye ọrụ bụ iberibe koodu maka ime otu ihe. Enwere ụdị ndị ọrụ atọ:
    • edinamdị ka ọkacha mmasị anyị PythonOperator, nke nwere ike mebie koodu Python ọ bụla (nke ọma;
    • nyefe, nke na-ebuga data site n'otu ebe ruo ebe, sị, MsSqlToHiveTransfer;
    • mmetụta N'aka nke ọzọ, ọ ga-enye gị ohere imeghachi omume ma ọ bụ kwụsịlata mmezu nke dag ahụ ruo mgbe ihe omume mere. HttpSensor nwere ike ịdọrọ njedebe a kapịrị ọnụ, na mgbe azịza achọrọ na-echere, malite mbufe GoogleCloudStorageToS3Operator. Uche nke na-ajụ ase ga-ajụ, sị: “Gịnị mere? E kwuwerị, ị nwere ike ime ugboro ugboro na onye ọrụ! " Ma mgbe ahụ, ka ọ ghara igbochi ọdọ mmiri nke ọrụ na ndị ọrụ kwụsịtụrụ. Ihe mmetụta ahụ na-amalite, lelee ma nwụọ tupu mbọ ọzọ.
  • Task - ndị ọrụ ekwuputara, n'agbanyeghị ụdị, na mgbakwunye na dag na-akwalite n'ọkwa ọrụ.
  • ihe atụ ọrụ - mgbe onye nhazi izugbe kpebiri na ọ bụ oge iziga ọrụ n'ime agha na ndị na-arụ ọrụ (n'ebe ahụ, ọ bụrụ na anyị na-eji. LocalExecutor ma ọ bụ ka a anya ọnụ n'ihe banyere CeleryExecutor), ọ na-ekenye ha ihe gbara ha gburugburu (ntụgharị, usoro mgbanwe - execution parameters), gbasaa iwu ma ọ bụ ajụjụ ndebiri, ma kpokọta ha.

Anyị na-emepụta ọrụ

Nke mbụ, ka anyị kọwapụta atụmatụ izugbe nke doug anyị, mgbe ahụ, anyị ga-abanye n'ime nkọwa ndị ọzọ, n'ihi na anyị na-etinye ụfọdụ ngwọta na-adịghị mkpa.

Yabụ, n'ụdị ya kachasị mfe, dag dị otú ahụ ga-adị ka nke a:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ka anyị chepụta ya:

  • Nke mbụ, anyị na-ebubata libs dị mkpa na ihe ozo;
  • sql_server_ds Ndi List[namedtuple[str, str]] na aha njikọ sitere na Airflow Connections na ọdụ data nke anyị ga-ebu efere anyị;
  • dag - ọkwa nke dag anyị, nke ga-abụrịrị na globals(), ma ọ bụghị ya, ikuku ikuku agaghị achọta ya. Doug kwesịkwara ịsị:
    • gịnị bụ aha ya orders - aha a ga-apụta na interface weebụ,
    • na ọ ga-arụ ọrụ site n'etiti abalị n'abalị asatọ nke Julaị,
    • na ọ ga-agba ọsọ, ihe dị ka awa 6 ọ bụla (maka ndị siri ike ebe a kama timedelta() nnabata cron-akara 0 0 0/6 ? * * *, maka obere jụụ - okwu dị ka @daily);
  • workflow() ga-arụ ọrụ bụ isi, ma ọ bụghị ugbu a. Maka ugbu a, anyị ga-atụba ihe ndị gbara anyị gburugburu n'ime log.
  • Ma ugbu a, anwansi dị mfe nke ịmepụta ọrụ:
    • anyị na-agba ọsọ site na isi mmalite anyị;
    • ibido PythonOperator, nke ga-egbu anyị dummy workflow(). Echefula ịkọwapụta aha pụrụ iche (n'ime dag) nke ọrụ ahụ wee kee dag ahụ n'onwe ya. Ọkọlọtọ provide_context n'aka nke ya, ga-awụsa arụmụka ndị ọzọ n'ime ọrụ ahụ, nke anyị ga-eji nlezianya na-anakọta **context.

Maka ugbu a, nke ahụ bụ naanị. Ihe anyị nwetara:

  • dag ọhụrụ na interface weebụ,
  • otu narị ọrụ na ọkara nke a ga-eme n'otu oge (ma ọ bụrụ na Airflow, Celery settings na server power) kwere ya.

Ọfọn, ọ fọrọ nke nta ka ọ nweta ya.

Apache Airflow: Na-eme ka ETL dị mfe
Onye ga-etinye ihe ndabere?

Iji mee ka ihe a niile dị mfe, m batara docker-compose.yml nhazi requirements.txt na nodes niile.

Ugbu a ọ pụọ:

Apache Airflow: Na-eme ka ETL dị mfe

Oghere agba ntụ bụ ihe atụ ọrụ nke onye nhazi oge na-ahazi.

Anyị na-echere ntakịrị, ndị ọrụ na-ewepụta ọrụ ndị a:

Apache Airflow: Na-eme ka ETL dị mfe

Ndị na-acha akwụkwọ ndụ akwụkwọ ndụ, n'ezie, arụchaala ọrụ ha nke ọma. Uhie anaghị eme nke ọma.

Site n'ụzọ, enweghị nchekwa na prod anyị ./dags, enweghị mmekọrịta n'etiti igwe - dags niile dina na git na Gitlab anyị, na Gitlab CI na-ekesa mmelite na igwe mgbe ị na-ejikọta master.

Obere banyere Ifuru

Ọ bụ ezie na ndị ọrụ na-akụri ihe ndị anyị ji eme ihe, ka anyị cheta ngwá ọrụ ọzọ nwere ike igosi anyị ihe - Ifuru.

Ibe mbụ nke nwere ozi nchịkọta maka ọnụ ndị ọrụ:

Apache Airflow: Na-eme ka ETL dị mfe

Ibe kachasị ewu ewu nwere ọrụ ndị gara ọrụ:

Apache Airflow: Na-eme ka ETL dị mfe

Ibe kachasị na-agwụ ike na ọkwa nke onye na-ere ahịa anyị:

Apache Airflow: Na-eme ka ETL dị mfe

Ibe kacha egbuke egbuke nwere eserese ọkwa ọrụ yana oge mmezu ha:

Apache Airflow: Na-eme ka ETL dị mfe

Anyị na-ebu ihe ndị na-ebughị ibu

Ya mere, ọrụ niile arụ ọrụ, ị nwere ike iburu ndị merụrụ ahụ.

Apache Airflow: Na-eme ka ETL dị mfe

Na e nwere ọtụtụ ndị merụrụ ahụ - n'ihi otu ihe ma ọ bụ ọzọ. N'ihe banyere iji Airflow ziri ezi, akụkụ ndị a na-egosi na data ahụ eruteghị n'ezie.

Ịkwesịrị ilele ndekọ ahụ wee malitegharịa ihe omume dara ada.

Site na ịpị square ọ bụla, anyị ga-ahụ omume dị anyị:

Apache Airflow: Na-eme ka ETL dị mfe

Ị nwere ike were mee Kpochapụ ndị dara ada. Ya bụ, anyị na-echefu na ihe dara ada n'ebe ahụ, na otu ihe atụ ọrụ ga-aga na nhazi.

Apache Airflow: Na-eme ka ETL dị mfe

O doro anya na ime nke a na òké na akụkụ niile na-acha uhie uhie abụghị ezigbo mmadụ - nke a abụghị ihe anyị na-atụ anya n'aka Airflow. Dị ka o kwesịrị ịdị, anyị nwere ngwa ọgụ nke mbibi: Browse/Task Instances

Apache Airflow: Na-eme ka ETL dị mfe

Ka anyị họrọ ihe niile ozugbo wee tọgharịa na efu, pịa ihe ziri ezi:

Apache Airflow: Na-eme ka ETL dị mfe

Mgbe ihichachara, tagzi anyị dị ka nke a (ha na-echere onye nhazi nhazi ka ọ hazie ha):

Apache Airflow: Na-eme ka ETL dị mfe

Njikọ, nko na mgbanwe ndị ọzọ

Oge erugo ile anya DAG na-esote, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Onye ọ bụla enwela mmelite akụkọ? Nke a bụ ya ọzọ: enwere ndepụta nke isi mmalite ebe ị ga-esi nweta data; enwere ndepụta ebe a ga-etinye; echefukwala honk mgbe ihe niile mere ma ọ bụ mebie (nke ọma, nke a abụghị banyere anyị, mba).

Ka anyị gagharịa na faịlụ ahụ ọzọ wee lelee ihe ọhụrụ ahụ na-edoghị anya:

  • from commons.operators import TelegramBotSendMessage - ọ dịghị ihe na-egbochi anyị ịmepụta ndị na-arụ ọrụ nke anyị, nke anyị jiri mee ihe site n'ime obere ihe mkpuchi maka izipu ozi na Unlocked. (Anyị ga-ekwukwu banyere onye ọrụ a n'okpuru);
  • default_args={} - dag nwere ike kesaa otu arụmụka ahụ na ndị ọrụ ya niile;
  • to='{{ var.value.all_the_kings_men }}' - ubi to anyị agaghị enwe koodu siri ike, mana dynamically emepụtara site na iji Jinja na mgbanwe nwere ndepụta ozi ịntanetị, nke m ji nlezianya tinye ya. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - ọnọdụ maka ịmalite onye ọrụ. N'ọnọdụ anyị, akwụkwọ ozi ahụ ga-efega ndị isi naanị ma ọ bụrụ na ihe ndabere niile arụ ọrụ nke ọma;
  • tg_bot_conn_id='tg_main' - arụmụka conn_id nabata NJ njikọ anyị mepụtara na ya Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ozi na Telegram ga-efe efe naanị ma ọ bụrụ na enwere ọrụ dara ada;
  • task_concurrency=1 - anyị na-amachibido ịmalite ọtụtụ ihe omume nke otu ọrụ n'otu oge. Ma ọ bụghị ya, anyị ga-enweta n'out oge igba egbe nke ọtụtụ VerticaOperator (na-ele otu tebụl);
  • report_update >> [email, tg] - niile VerticaOperator jikọọ na izipu akwụkwọ ozi na ozi dị ka nke a:
    Apache Airflow: Na-eme ka ETL dị mfe

    Mana ebe ọ bụ na ndị ọrụ ngosi nwere ọnọdụ mmalite dị iche iche, naanị otu ga-arụ ọrụ. Na View Tree, ihe niile na-ele anya ntakịrị ntakịrị:
    Apache Airflow: Na-eme ka ETL dị mfe

M ga-ekwu okwu ole na ole gbasara nnukwu macro na ndị enyi ha - mgbanwe.

Macros bụ ebe nchekwa Jinja nwere ike dochie ozi bara uru dị iche iche n'ime arụmụka ndị ọrụ. Dịka ọmụmaatụ, dịka nke a:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ga-agbasa ruo n'ime ọdịnaya nke agbanwe agbanwe execution_date na usoro YYYY-MM-DD: 2020-07-14. Akụkụ kachasị mma bụ na a na-akpọgide mgbanwe ndị gbara ya gburugburu na otu ihe atụ ọrụ (square dị na View Tree View), ma mgbe ịmalitegharịrị, ndị na-edebe ebe ahụ ga-agbasa ruo otu ụkpụrụ.

Enwere ike ịlele ụkpụrụ ndị enyere site na iji bọtịnụ Rendered na ihe atụ ọrụ ọ bụla. Nke a bụ ka ọrụ izipu akwụkwọ ozi:

Apache Airflow: Na-eme ka ETL dị mfe

Ya mere, n'ọrụ na izipu ozi:

Apache Airflow: Na-eme ka ETL dị mfe

Ndepụta zuru oke nke macros arụnyere maka ụdị kachasị ọhụrụ dị ebe a: ntụaka macros

Ọzọkwa, site n'enyemaka nke plugins, anyị nwere ike ikwuwapụta macro nke anyị, mana nke ahụ bụ akụkọ ọzọ.

Na mgbakwunye na ihe ndị eburu ụzọ kọwaa, anyị nwere ike dochie ụkpụrụ nke mgbanwe anyị (ejirila m nke a na koodu dị n'elu). Ka anyị mepụta Admin/Variables ihe abụọ:

Apache Airflow: Na-eme ka ETL dị mfe

Ihe niile ị nwere ike iji:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Uru nwere ike ịbụ scalar, ma ọ bụ ọ nwekwara ike ịbụ JSON. N'ihe banyere JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

jiri naanị ụzọ gaa igodo ịchọrọ: {{ var.json.bot_config.bot.token }}.

Aga m ekwu otu okwu wee gosi otu nseta ihuenyo gbasara njikọ. Ihe niile bụ elementrị ebe a: na ibe Admin/Connections anyị na-emepụta njikọ, tinye logins / okwuntughe na paramita ndị ọzọ akọwapụtara ebe ahụ. Dị ka nke a:

Apache Airflow: Na-eme ka ETL dị mfe

Enwere ike ezoro ezo okwuntughe (nke ọma karịa nke ndabara), ma ọ bụ ị nwere ike hapụ ụdị njikọ ahụ (dịka m mere maka tg_main) - nke bụ eziokwu bụ na ndepụta nke ụdị na hardwired na Airflow ụdị na enweghị ike ịgbasa na-enweghị ịbanye na koodu isi (ọ bụrụ na mberede m google ihe, biko gbazie m), ma ọ dịghị ihe ga-egbochi anyị inweta kredit naanị site na. aha.

Ị nwekwara ike ime ọtụtụ njikọ na otu aha: na nke a, usoro BaseHook.get_connection(), nke na-enweta njikọ anyị n'aha, ga-enye random site na ọtụtụ namesakes (ọ ga-abụ ihe ezi uche dị na ya ime Round Robin, ma ka anyị hapụ ya na akọ na uche nke ndị na-emepụta Airflow).

Mgbanwe na Njikọ bụ ngwaọrụ dị mma n'ezie, mana ọ dị mkpa ka ị ghara idafu nguzozi: akụkụ nke mmiri gị ị na-echekwa na koodu n'onwe ya, yana akụkụ ndị ị na-enye Airflow maka nchekwa. N'otu aka ahụ, ọ nwere ike ịdị mma ịgbanwe uru ngwa ngwa, dịka ọmụmaatụ, igbe nzipu ozi, site na UI. N'aka nke ọzọ, nke a ka bụ nlọghachi na òké pịa, nke anyị (M) chọrọ ikpochapụ.

Ịrụ ọrụ na njikọ bụ otu n'ime ọrụ nko. N'ozuzu, nko Airflow bụ isi maka ijikọ ya na ọrụ ndị ọzọ na ụlọ akwụkwọ. Dịka ọmụmaatụ, JiraHook ga-emeghe onye ahịa ka anyị na Jira na-emekọrịta ihe (ị nwere ike ibugharị ọrụ azụ na azụ), yana site n'enyemaka nke SambaHook ị nwere ike ịkwanye faịlụ mpaghara na smb-atụ.

Na-enyocha onye ọrụ omenala

Anyị wee bịaruo nso ileba anya ka esi eme ya TelegramBotSendMessage

Usoro commons/operators.py ya na onye ọrụ n'ezie:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ebe a, dị ka ihe ọ bụla ọzọ na Airflow, ihe niile dị nnọọ mfe:

  • Ketara site na BaseOperator, nke na-eme ihe ole na ole Airflow kpọmkwem (lelee oge ntụrụndụ gị)
  • Ala ndị ekwuputara template_fields, nke Jinja ga-achọ macros iji hazie.
  • Haziri arụmụka ziri ezi maka __init__(), tọọ ndabara ebe ọ dị mkpa.
  • Anyị echefughịkwa banyere mmalite nke nna ochie.
  • Mepee nko kwekọrọ TelegramBotHooknatara ihe onye ahịa site na ya.
  • Usoro agbaghapụrụ (akọwapụtara). BaseOperator.execute(), nke Airfow ga-atụgharị mgbe oge ruru ịmalite onye ọrụ - na ya anyị ga-emejuputa isi ihe, na-echefu ịbanye. (Anyị na-abanye, n'ụzọ, ozugbo stdout и stderr - Ikuku ikuku ga-egbochi ihe niile, kechie ya nke ọma, mebie ya ebe ọ dị mkpa.)

Ka anyị hụ ihe anyị nwere commons/hooks.py. Akụkụ mbụ nke faịlụ ahụ, ya na nko n'onwe ya:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Amaghị m ihe m ga-akọwa ebe a, m ga-ahụ naanị isi ihe ndị dị mkpa:

  • Anyị ketara, chee echiche banyere arụmụka - n'ọtụtụ ọnọdụ ọ ga-abụ otu: conn_id;
  • Usoro ọkọlọtọ na-emebi emebi: Enwere m onwe m oke get_conn(), nke m ga-enweta paramita njikọ site na aha wee nweta naanị ngalaba extra (nke a bụ ubi JSON), nke m (dị ka ntuziaka nke m siri dị!) tinye akara bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • M na-emepụta ihe atụ nke anyị TelegramBot, na-enye ya akara ngosi.

Ọ gwụla. Ị nwere ike nweta onye ahịa site na nko iji TelegramBotHook().clent ma ọ bụ TelegramBotHook().get_conn().

Na akụkụ nke abụọ nke faịlụ ahụ, nke m na-eme microwrapper maka Telegram REST API, ka ọ ghara ịdọrọ otu ihe ahụ. python-telegram-bot maka otu usoro sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ụzọ ziri ezi bụ ịgbakwunye ya niile: TelegramBotSendMessage, TelegramBotHook, TelegramBot - na ngwa mgbakwunye, tinye na ebe nchekwa ọha, nye ya Open Source.

Mgbe anyị na-amụ ihe ndị a niile, mmelite akụkọ anyị jisiri ike daa nke ọma wee ziga m ozi njehie na ọwa. M ga-elele ka m mara ma ọ dị njọ...

Apache Airflow: Na-eme ka ETL dị mfe
Ihe mebiri na doge anyị! Ọ́ bụghị ihe anyị tụrụ anya ya? Kpọmkwem!

Ị ga-awụsa?

Ọ dị gị ka ihe ọ bụla funahụrụ m? Ọ dị ka o kwere nkwa ịnyefe data sitere na SQL Server na Vertica, wee were ya wee pụọ n'isiokwu ahụ, onye nzuzu!

Obi ọjọọ a bụ ụma kpachaara anya, naanị m ga-aghọtara gị ụfọdụ okwu. Ugbu a ị nwere ike ịga n'ihu.

Atụmatụ anyị bụ nke a:

  1. Dag
  2. Mepụta ihe aga-eme
  3. Lee ka ihe niile si maa mma
  4. Kenye nọmba nnọkọ iji mejupụta
  5. Nweta data sitere na SQL Server
  6. Tinye data na Vertica
  7. Anakọta ọnụ ọgụgụ

Yabụ, iji nweta ihe a niile na-agba ọsọ, emere m ntakịrị mgbakwunye na anyị docker-compose.yml:

docker-ede.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

N'ebe ahụ, anyị na-ebuli:

  • Vertica dị ka onye ọbịa dwh nwere ntọala ndabara kachasị,
  • ugboro atọ nke SQL Server,
  • anyị na-ejupụta ọdụ data na nke ikpeazụ na ụfọdụ data (n'ọnọdụ ọ bụla elela anya mssql_init.py!)

Anyị na-ebupụta ihe ọma niile site n'enyemaka nke iwu dị mgbagwoju anya karịa oge ikpeazụ:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ihe randomizer ọrụ ebube anyị mepụtara, ị nwere ike iji ihe ahụ Data Profiling/Ad Hoc Query:

Apache Airflow: Na-eme ka ETL dị mfe
Isi ihe abụghị igosi ya ndị nyocha

kọwaa na Oge nke ETL Achọghị m, ihe niile dị ntakịrị n'ebe ahụ: anyị na-eme ntọala, enwere akara na ya, anyị na-ejikọta ihe niile na onye njikwa ihe, ma ugbu a, anyị na-eme nke a:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

nnọkọ.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Oge eruola na-anakọta data anyị site na otu narị tebụl anyị na ọkara. Ka anyị mee nke a site n'enyemaka nke ahịrị enweghị nkọwa:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Site n'enyemaka nke nko anyị na-enweta site na Airflow pymssql- jikọọ
  2. Ka anyị dochie mmachi n'ụdị ụbọchị n'ime arịrịọ ahụ - a ga-atụba ya n'ime ọrụ site na engine template.
  3. Na-eri nri anyị arịrịọ pandasonye ga-enweta anyi DataFrame - ọ ga-abara anyị uru n'ọdịnihu.

Ana m eji nnọchi {dt} kama arịrịọ arịrịọ %s ọ bụghị n'ihi na m bụ onye ọjọọ Pinocchio, ma n'ihi na pandas enweghị ike ijikwa pymssql ma tufuo nke ikpeazụ params: Listn'agbanyeghị na ọ chọrọ n'ezie tuple.
Marakwa na onye nrụpụta pymssql kpebiri na ya agaghị akwado ya ọzọ, na ọ bụ oge ịkwaga pyodbc.

Ka anyị hụ ihe Airflow chịkọtara arụmụka nke ọrụ anyị:

Apache Airflow: Na-eme ka ETL dị mfe

Ọ bụrụ na enweghị data, mgbe ahụ ọ nweghị uru ịga n'ihu. Ma ọ bụkwa ihe ijuanya ịtụle ndochi ahụ nke ọma. Mana nke a abụghị mmejọ. A-ah-ah, kedu ihe ị ga-eme?! Ma ebe a bụ ihe:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException na-agwa Airflow na enweghị mmejọ, mana anyị na-amapụ ọrụ ahụ. Ihe interface ahụ agaghị enwe square green ma ọ bụ ọbara ọbara, kama pink.

Ka anyị tufuo data anyị otutu ogidi:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nke a bụ:

  • Ebe nchekwa data nke anyị nwetara iwu,
  • ID nke nnọkọ idei mmiri anyị (ọ ga-adị iche maka ọrụ ọ bụla),
  • A hash site na isi iyi na ID ID - nke mere na na nchekwa data ikpeazụ (ebe a na-awụsa ihe niile n'otu tebụl) anyị nwere ID pụrụ iche.

Nzọụkwụ penultimate fọdụrụ: wụsa ihe niile na Vertica. Na, n'ụzọ dị ịtụnanya, otu n'ime ụzọ kachasị dị egwu na ịrụ ọrụ nke ọma iji mee nke a bụ site na CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Anyị na-eme ihe nnata pụrụ iche StringIO.
  2. pandas ga-eji obiọma tinye anyị DataFrame n'ụdị CSV-ahịrị.
  3. Ka anyị jiri nko meghee njikọ na Vertica ọkacha mmasị anyị.
  4. Ma ugbu a site n'enyemaka copy() zipu data anyị ozugbo na Vertika!

Anyị ga-esi n'aka onye ọkwọ ụgbọ ala nweta ahịrị ole jupụtara, ma gwa onye njikwa nnọkọ na ihe niile dị mma:

session.loaded_rows = cursor.rowcount
session.successful = True

Ọ gwụla.

Na ire ere, anyị na-eji aka na-emepụta efere ezubere iche. Ebe a ka m kwere onwe m obere igwe:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ana m eji VerticaOperator() M na-emepụta atụmatụ nchekwa data na tebụl (ọ bụrụ na ha adịghị adị, n'ezie). Isi ihe bụ ịhazi ihe ndabere nke ọma:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Na-agbakọta elu

- Ọfọn, - kwuru obere òké, - ọ bụghị ya, ugbu a
Ị kwenyesiri ike na abụ m anụ ọhịa kacha njọ?

Julia Donaldson, The Gruffalo

Echere m na ọ bụrụ na mụ na ndị ọrụ ibe m nwere asọmpi: onye ga-emepụta ngwa ngwa ma malite usoro ETL site na ọkọ: ha na SSIS ha na òké na mụ na Airflow ... Ma mgbe ahụ anyị ga-atụnyere ịdị mfe nke mmezi ... Chei, echere m na ị ga-ekweta na m ga-eti ha ihe n'akụkụ niile!

Ọ bụrụ na ọ dị ntakịrị karị, mgbe ahụ Apache Airflow - site n'ịkọwa usoro n'ụdị koodu mmemme - rụrụ ọrụ m. nke ukwuu ka ahụ iru ala ma na-atọ ụtọ.

Ya na-akparaghị ókè extensibility, ma na okwu nke plug-ins na predisposition ka scalability, na-enye gị ohere iji Airflow na ihe fọrọ nke nta ka ọ bụla ebe: ọbụna na zuru okirikiri nke ịnakọta, na-akwadebe na nhazi data, ọbụna na launching rọketi (na Mars, nke) usoro).

Akụkụ ikpeazụ, ntụaka na ozi

The rake anyị chịkọtara gị

  • start_date. Ee, nke a abụrụlarị meme obodo. Via Doug isi arụmụka start_date niile gafere. Na nkenke, ọ bụrụ na ị kọwapụta na start_date ụbọchị ugbu a, na schedule_interval - otu ụbọchị, mgbe ahụ DAG ga-amalite echi ọ dịghị mbụ.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Enweghịkwa nsogbu ọzọ.

    Ọ nwere mperi oge ọsọ ọzọ jikọtara ya na ya: Task is missing the start_date parameter, nke na-egosikarị na ị chefuru ijikọ na onye na-arụ ọrụ dag.

  • Ha niile na otu igwe. Ee, na ntọala (Airflow n'onwe ya na mkpuchi anyị), na sava weebụ, na onye nhazi oge, na ndị ọrụ. Ọ rụkwara ọrụ. Ma ka oge na-aga, ọnụ ọgụgụ nke ọrụ maka ọrụ na-eto eto, mgbe PostgreSQL malitere ịzaghachi na index na 20 s kama 5 ms, anyị weere ya ma buru ya.
  • Onye na-ahụ maka mpaghara. Ee, anyị ka nọkwa n’elu ya, anyị abatakwala n’ọnụ ọnụ abis. LocalExecutor ezuola anyị ruo ugbu a, mana ugbu a bụ oge ịgbasa ma ọ dịkarịa ala otu onye ọrụ, anyị ga-agbasi mbọ ike ịkwaga CeleryExecutor. Na n'ihi na ị nwere ike ịrụ ọrụ na ya na otu igwe, ọ dịghị ihe na-egbochi gị iji Celery ọbụna na ihe nkesa, nke "n'ezie, agaghị abanye na mmepụta, n'eziokwu!"
  • Ejighị ya arụrụ arụ ọrụ:
    • njikọ iji chekwaa nzere ọrụ,
    • SLA efu ịzaghachi ọrụ ndị na-adịghị arụ ọrụ n'oge,
    • xcom maka mgbanwe metadata (m kwuru metadata!) n'etiti dag aga-eme.
  • Mmegbu mail. Ọfọn, gịnị ka m ga-ekwu? Edebere ọkwa maka nkwugharị niile nke ọrụ dara ada. Ugbu a ọrụ m Gmail nwere> ozi ịntanetị 90k sitere na Airflow, na muzzle ozi webụ jụrụ iburu na ihichapụ ihe karịrị 100 n'otu oge.

Ọnyà ndị ọzọ: Ọnwa mmiri Apache ikuku

Ngwa akpaaka ndị ọzọ

Ka anyị jiri isi anyị rụọ ọrụ karịa, ọ bụghịkwa aka anyị, Airflow akwadola anyị nke a:

  • fọduru API - ọ ka nwere ọnọdụ nke nnwale, nke na-adịghị egbochi ya ịrụ ọrụ. Site na ya, ị nwere ike ọ bụghị naanị nweta ozi gbasara dags na ọrụ, kamakwa kwụsị / malite dag, mepụta DAG Run ma ọ bụ ọdọ mmiri.
  • CLI - ọtụtụ ngwaọrụ dị site na ahịrị iwu nke na-adịghị adị mfe iji site na WebUI, mana anaghị adịkarị. Ọmụmaatụ:
    • backfill achọrọ ka ịmalitegharịa ihe omume.
      Dịka ọmụmaatụ, ndị nyocha bịara wee sị: “Ma gị onwe gị, comrade, nwere ihe efu na data sitere na Jenụwarị 1 ruo 13! Dezie ya, dozie ya, dozie ya, dozie ya!" Na ị bụ onye obi ụtọ:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Ọrụ ntọala: initdb, resetdb, upgradedb, checkdb.
    • run, nke na-enye gị ohere ịrụ ọrụ otu ihe atụ, na ọbụna akara na ndabere niile. Ọzọkwa, ị nwere ike na-agba ọsọ ya via LocalExecutor, ọbụlagodi na ị nwere ụyọkọ Celery.
    • Na-eme nke ọma otu ihe ahụ test, naanị nakwa na bases na-ede ihe ọ bụla.
    • connections na-enye ohere uka eke njikọ si shei.
  • Python Python - a kama hardcore ụzọ mmekọrịta, nke ezubere maka plugins, na-adịghị swarming na ya na obere aka. Mana onye ga-egbochi anyị ịga /home/airflow/dags, na-agba ọsọ ipython wee malite imegharị ihe? Ị nwere ike, dịka ọmụmaatụ, jiri koodu na-esonụ bupu njikọ niile:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Jikọọ na Airflow metadatabase. Anaghị m akwado idegara ya akwụkwọ, mana ịnweta steeti ọrụ maka metrik dị iche iche nwere ike ịdị ngwa ngwa ma dịkwa mfe karịa iji API ọ bụla.

    Ka anyị kwuo na ọ bụghị ọrụ anyị niile nwere ike, mana ha nwere ike ịda mgbe ụfọdụ, nke a bụkwa ihe nkịtị. Mana mgbochi ole na ole adịlarị enyo, ọ ga-adị mkpa ịlele.

    Kpachara anya SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

zoro

Ma n'ezie, njikọ iri mbụ sitere na ntinye nke Google bụ ọdịnaya nke nchekwa Airflow site na ibe edokọbara m.

Na njikọ ndị a na-eji na akụkọ:

isi: www.habr.com

Zụta nnabata ntụkwasị obi maka saịtị nwere nchekwa DDoS, sava VPS VDS 🔥 Zụta ebe nrụọrụ weebụ a pụrụ ịtụkwasị obi na nchekwa DDoS, sava VPS VDS | ProHoster