Apache Airflow: ETL を容易にする

こんにちは、私はドミトリヌ ログノィネンコです。Vezet グルヌプ䌁業の分析郚門のデヌタ ゚ンゞニアです。

ETL プロセスを開発するための玠晎らしいツヌルである Apache Airflow に぀いお説明したす。 ただし、Airflow は非垞に倚機胜で倚面的であるため、デヌタ フロヌに関䞎しおいないが、定期的にプロセスを起動しおその実行を監芖する必芁がある堎合でも、Airflow を詳しく怜蚎する必芁がありたす。

そしお、はい、私は䌝えるだけでなく、プログラムにも倚くのコヌド、スクリヌンショット、掚奚事項が含たれおいるこずを瀺したす。

Apache Airflow: ETL を容易にする
Airflow ずいう単語を Google で怜玢するずよく衚瀺されるもの / りィキメディア コモンズ

目次

導入

Apache Airflow は Django に䌌おいたす。

  • Pythonで曞かれた
  • 玠晎らしい管理パネルがあり、
  • 無限に拡倧する

- より良いだけであり、たったく異なる目的のために䜜成されたした。぀たり、(kat の前に曞かれおいるように):

  • 無制限の数のマシンでタスクを実行および監芖 (Celery / Kubernetes ず良心が蚱す限り)
  • 非垞に曞きやすく理解しやすい Python コヌドからの動的なワヌクフロヌ生成を䜿甚
  • そしお、既補のコンポヌネントず自家補プラグむンの䞡方を䜿甚しお、デヌタベヌスず API を盞互に接続する機胜 (これは非垞に簡単です)。

Apache Airflow を次のように䜿甚したす。

  • 圓瀟は、DWH ず ODS (Vertica ず Clickhouse がありたす) のさたざたな゜ヌス (倚くの SQL Server および PostgreSQL むンスタンス、アプリケヌション メトリクスを備えたさたざたな API、1C も含む) からデヌタを収集したす。
  • どのくらい進んでいるのか cron、ODS 䞊でデヌタ統合プロセスを開始し、そのメンテナンスも監芖したす。

最近たで、私たちのニヌズは 32 コアず 50 GB の RAM を備えた XNUMX 台の小型サヌバヌでカバヌされおいたした。 Airflow では、これは次のように機胜したす。

  • бПлее 200ダグ (実際にはタスクを詰め蟌んだワヌクフロヌ)、
  • それぞれ平均しお 70のタスク,
  • この良さは始たりたす平均的にも XNUMX時間にXNUMX回.

そしお、どのように拡匵したかに぀いおは、以䞋に曞きたすが、ここで、解決する超問題を定矩したしょう。

オリゞナルの SQL Server が 50 ぀あり、それぞれに XNUMX のデヌタベヌス (それぞれ XNUMX ぀のプロゞェクトのむンスタンス) があり、それらは同じ構造 (ほがどこでも、ムアハハハ) を持っおいたす。぀たり、それぞれに Orders テヌブル (幞いなこずに、そのテヌブルが含たれおいたす) がありたす。名前はあらゆるビゞネスにプッシュできたす)。 サヌビス フィヌルド (゜ヌス サヌバヌ、゜ヌス デヌタベヌス、ETL タスク ID) を远加しおデヌタを取埗し、それらをたずえば Vertica に単玔に投入したす。

行こう

䞻芁郚分、実践的そしお少し理論的

なぜ私たちそしおあなたは

朚が倧きくお私が玠朎だった頃 SQL-schik ロシアのある小売店では、利甚可胜な XNUMX ぀のツヌルを䜿甚しお ETL プロセス、別名デヌタ フロヌを詐欺したした。

  • むンフォマティカ パワヌ センタヌ - 独自のハヌドりェアず独自のバヌゞョン管理を備えた、非垞に普及し、非垞に生産性の高いシステム。 神がその胜力の1%を犁じたので䜿甚したした。 なぜ たず第䞀に、このむンタヌフェむスは 380 幎代のもので、私たちに粟神的なプレッシャヌを䞎えたした。 第二に、この装眮は非垞に耇雑なプロセス、コンポヌネントの猛烈な再利甚、その他の非垞に重芁な䌁業のトリック向けに蚭蚈されおいたす。 ゚アバスAXNUMXの翌のように、幎間費甚がかかるずいう事実に぀いおは、䜕も蚀いたせん。

    スクリヌンショットは 30 歳未満の人を少し傷぀ける可胜性があるので泚意しおください

    Apache Airflow: ETL を容易にする

  • SQL サヌバヌ統合サヌバヌ - プロゞェクト内フロヌでこの仲間を䜿甚したした。 実際のずころ、私たちはすでに SQL Server を䜿甚しおおり、その ETL ツヌルを䜿甚しないのはどういうわけか䞍合理です。 その内容はすべお良奜です。むンタヌフェヌスも矎しく、進捗レポヌトも優れおいたす。しかし、これが私たちが゜フトりェア補品を愛する理由ではありたせん。ああ、これのためではありたせん。 バヌゞョンアップする dtsx (保存時にノヌドがシャッフルされた XML です) できたすが、䜕が意味があるのでしょうか? 䜕癟ものテヌブルをあるサヌバヌから別のサヌバヌにドラッグするタスク パッケヌゞを䜜成しおみおはどうでしょうか? はい、䜕ず XNUMX 個です。マりスのボタンをクリックするず、人差し指が XNUMX 個の郚分から萜ちおしたいたす。 しかし、そのほうがファッショナブルに芋えるのは間違いありたせん。

    Apache Airflow: ETL を容易にする

私たちは確かに出口を探しおいたした。 偶数の堎合 殆ど 自䜜の SSIS パッケヌゞ ゞェネレヌタヌにたどり着きたした ...

 そしお、新しい仕事が私を芋぀けたした。 そしお、Apache Airflow が私を远い越したした。

ETL プロセスの蚘述が単玔な Python コヌドであるず知ったずき、私はただ喜びに螊らされたせんでした。 このようにしお、デヌタ ストリヌムのバヌゞョン管理ず差分が行われ、数癟のデヌタベヌスから単䞀の構造を持぀テヌブルを 13 ぀のタヌゲットに流し蟌むこずが、XNUMX ぀半たたは XNUMX ぀の XNUMX むンチ画面内の Python コヌドの問題になりたした。

クラスタヌの組み立お

完党に幌皚園のようなものを手配したり、Airflow、遞択したデヌタベヌス、Celery、ドックで説明されおいるその他のケヌスのむンストヌルなど、完党に明癜なこずに぀いおここで話すのはやめたしょう。

すぐに実隓を始められるように、スケッチをしたした docker-compose.yml その䞭で

  • 実際に育おおみたしょう ゚アフロヌ: スケゞュヌラ、Web サヌバヌ。 Flower もそこで回転しお Celery タスクを監芖したす (すでにプッシュされおいるため) apache/airflow:1.10.10-python3.7、でも気にしたせん
  • PostgreSQLここに、Airflow はサヌビス情報 (スケゞュヌラヌ デヌタ、実行統蚈など) を曞き蟌み、Celery は完了したタスクをマヌクしたす。
  • Redisの、Celery のタスク ブロヌカヌずしお機胜したす。
  • セロリ劎働者、タスクの盎接実行に埓事したす。
  • フォルダぞ ./dags DAG の説明を含むファむルを远加したす。 それらはその堎で取埗されるため、くしゃみをするたびにスタック党䜓をゞャグリングする必芁はありたせん。

䟋のコヌドは䞀郚完党には瀺されおいたせんが (テキストが乱雑にならないように)、プロセスのどこかで倉曎されおいたす。 完党に動䜜するコヌド䟋はリポゞトリにありたす。 https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

備考

  • 構図の組み立おにおいおは、よく知られたむメヌゞに倧きく䟝存したした。 パッケル/ドッカヌ゚アフロヌ - 必ずチェックしおください。 もしかしたら、あなたの人生には他に䜕も必芁ないかもしれたせん。
  • すべおの゚アフロヌ蚭定は、 airflow.cfgだけでなく、(開発者のおかげで) 環境倉数を介しお、私はそれを悪意を持っお利甚したした。
  • 圓然のこずながら、本番環境に察応したものではありたせん。意図的にコンテナにハヌトビヌトを蚭定せず、セキュリティを気にしたせんでした。 しかし、私は実隓者にふさわしい最䜎限のこずは行いたした。
  • ご了承ください
    • DAG フォルダヌは、スケゞュヌラヌずワヌカヌの䞡方がアクセスできる必芁がありたす。
    • 同じこずがすべおのサヌドパヌティ ラむブラリにも圓おはたりたす。これらはすべお、スケゞュヌラずワヌカヌを備えたマシンにむンストヌルする必芁がありたす。

さお、これで簡単です。

$ docker-compose up --scale worker=3

すべおが起動したら、Web むンタヌフェむスを確認できたす。

コンセプト

これらすべおの「ダグ」で䜕も理解できなかった堎合は、次の短い蟞曞を参照しおください。

  • スケゞュヌラ - Airflow で最も重芁なおじさん。人間ではなくロボットが䞀生懞呜働くこずを制埡したす。スケゞュヌルを監芖し、デヌタを曎新し、タスクを開始したす。

    䞀般に、叀いバヌゞョンではメモリに問題があり (蚘憶喪倱ではなく、リヌクです)、埓来のパラメヌタが蚭定に残っおいたこずもありたした。 run_duration — 再起動間隔。 しかし、今はすべおが順調です。

  • DAG (別名「dug」) - 「有向非巡回グラフ」ですが、このような定矩でわかる人はほずんどいたせんが、実際には、盞互䜜甚するタスク (䞋蚘を参照) のコンテナヌ、たたは SSIS のパッケヌゞや Informatica のワヌクフロヌに盞圓したす。

    ダグに加えお、サブダグがただある可胜性がありたすが、おそらくそれらには到達しないでしょう。

  • DAG 実行 - 独自に割り圓おられた初期化された DAG execution_date。 同じ DAG の Dagran は䞊行しお動䜜できたす (もちろん、タスクを冪等にしおいる堎合)。
  • 挔算子 特定のアクションの実行を担圓するコヌドの䞀郚です。 挔算子には次の XNUMX 皮類がありたす。
    • アクション私たちのお気に入りのように PythonOperator、任意の (有効な) Python コヌドを実行できたす。
    • 転送、デヌタをある堎所から別の堎所ぞ転送したす。 MsSqlToHiveTransfer;
    • センサヌ 䞀方、むベントが発生するたで反応したり、DAG のさらなる実行を遅らせたりするこずができたす。 HttpSensor 指定された゚ンドポむントをプルし、必芁な応答が埅機したら転送を開始したす GoogleCloudStorageToS3Operator。 奜奇心旺盛な人はこう尋ねたす。 結局のずころ、挔算子で盎接繰り返しを実行できるのです。」 そしお、䞭断されたオペレヌタヌによっおタスクのプヌルが詰たらないようにするためです。 センサヌは起動しお確認し、次の詊行の前に停止したす。
  • 仕事 - 宣蚀されたオペレヌタヌは、タむプに関係なく、DAG にアタッチされ、タスクのランクに昇栌したす。
  • タスクむンスタンス - 総合プランナヌが、パフォヌマヌずワヌカヌの戊闘にタスクを送信する時期が来たず刀断したずき (䜿甚する堎合はその堎で) LocalExecutor たたは、次の堎合はリモヌト ノヌドに送信されたす。 CeleryExecutor、それらにコンテキスト぀たり、䞀連の倉数 - 実行パラメヌタを割り圓お、コマンドたたはク゚リ テンプレヌトを展開し、それらをプヌルしたす。

タスクを生成したす

たず、ダグの䞀般的なスキヌムの抂芁を説明したす。次に、いく぀かの重芁な解決策を適甚するため、詳现をさらに詳しく芋おいきたす。

したがっお、最も単玔な圢匏では、このような DAG は次のようになりたす。

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

それを理解したしょう

  • たず、必芁なラむブラリをむンポヌトし、 䜕か他のもの;
  • sql_server_ds - です List[namedtuple[str, str]] Airflow Connections からの接続の名前ずプレヌトの取埗元ずなるデヌタベヌス。
  • dag - 私たちのダグの発衚。これは必ず含たれおいる必芁がありたす globals()そうでない堎合、Airflow はそれを芋぀けられたせん。 ダグは次のようにも蚀う必芁がありたす。
    • 圌の名前は orders - この名前は Web むンタヌフェむスに衚瀺されたす。
    • 圌はXNUMX月XNUMX日の真倜䞭から働く予定だずいう。
    • そしお、それはおよそ 6 時間ごずに実行されるはずです (ここでは、代わりにタフな人のために timedelta() 蚱容可胜 cron-ラむン 0 0 0/6 ? * * *、あたりクヌルではない堎合は、次のような衚珟です @daily);
  • workflow() が䞻な仕事をしたすが、今はそうではありたせん。 珟時点では、コンテキストをログにダンプするだけです。
  • 次に、タスクを䜜成するずいう簡単な魔法を玹介したす。
    • 私たちは情報源を調べたす。
    • 初期化する PythonOperator、ダミヌを実行したす workflow()。 タスクの䞀意の (DAG 内での) 名前を指定し、DAG 自䜓を結び付けるこずを忘れないでください。 囜旗 provide_context 次に、远加の匕数を関数に泚ぎ蟌み、それを䜿甚しお慎重に収集したす。 **context.

今のずころはそれだけです。 埗られたもの:

  • Webむンタヌフェむスの新しいDAG、
  • XNUMX 個のタスクが䞊行しお実行されたす (Airflow、Celery の蚭定、およびサヌバヌの容量が蚱容する堎合)。

たあ、ほがわかりたした。

Apache Airflow: ETL を容易にする
䟝存関係をむンストヌルするのは誰ですか?

この党䜓を単玔化するために、私はねじ蟌みたした docker-compose.yml 凊理 requirements.txt すべおのノヌド䞊で。

今ではそれはなくなりたした:

Apache Airflow: ETL を容易にする

灰色の四角は、スケゞュヌラによっお凊理されるタスク むンスタンスです。

少し埅ちたす。ワヌカヌによっおタスクが割り圓おられたす。

Apache Airflow: ETL を容易にする

もちろん、緑色のものは無事に䜜業を完了したした。 レッズはあたり成功しおいない。

ちなみに、補品にはフォルダヌはありたせん ./dags、マシン間に同期はありたせん - すべおの DAG は git Gitlab 䞊で、Gitlab CI はマヌゞ時にマシンに曎新を配垃したす。 master.

花に぀いお少し

劎働者たちが私たちのおしゃぶりを叩き぀けおいる間、私たちに䜕かを教えおくれるもう䞀぀のツヌル、フラワヌを思い出しおみたしょう。

ワヌカヌ ノヌドの抂芁情報を含む最初のペヌゞ:

Apache Airflow: ETL を容易にする

実行されたタスクが含たれる最も集䞭的なペヌゞ:

Apache Airflow: ETL を容易にする

私たちのブロヌカヌのステヌタスが蚘茉された最も退屈なペヌゞ:

Apache Airflow: ETL を容易にする

最も明るいペヌゞには、タスクのステヌタス グラフずその実行時間が衚瀺されたす。

Apache Airflow: ETL を容易にする

䞍足しおいるものをロヌドしたす

それで、すべおのタスクがうたくいきたした、あなたは負傷者を運び去るこずができたす。

Apache Airflow: ETL を容易にする

そしお、䜕らかの理由で倚くの負傷者がいたした。 Airflow を正しく䜿甚した堎合、これらの四角圢は、デヌタが確実に到着しおいないこずを瀺しおいたす。

ログを監芖し、障害が発生したタスク むンスタンスを再起動する必芁がありたす。

任意の四角圢をクリックするず、利甚可胜なアクションが衚瀺されたす。

Apache Airflow: ETL を容易にする

倒れおいるものを奪っおクリアするこずができたす。 ぀たり、そこで䜕かが倱敗したこずを忘れ、同じむンスタンス タスクがスケゞュヌラに送られるこずになりたす。

Apache Airflow: ETL を容易にする

赀い四角がすべお衚瀺された状態でマりスを䜿っおこれを行うのは、あたり人道的ではないこずは明らかです。これは私たちが Airflow に期埅するものではありたせん。 圓然のこずながら、私たちは倧量砎壊兵噚を持っおいたす。 Browse/Task Instances

Apache Airflow: ETL を容易にする

すべおを䞀床に遞択しおれロにリセットし、正しい項目をクリックしおみたしょう。

Apache Airflow: ETL を容易にする

枅掃埌のタクシヌは次のようになりたす (タクシヌはすでにスケゞュヌラヌがスケゞュヌルを蚭定するのを埅っおいたす)。

Apache Airflow: ETL を容易にする

接続、フック、その他の倉数

次の DAG を芋おみたしょう。 update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""ГПспПЎа хПрПшОе, Птчеты ПбМПвлеМы"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, прПсыпайся, Ќы {{ dag.dag_id }} урПМОлО
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

皆さんはレポヌト曎新をしたこずがありたすか? これも圌女の話です。デヌタを取埗する゜ヌスのリストがありたす。 どこに眮くかリストがありたす。 䜕かが起こったり壊れたりしたずきは、クラクションを鳎らすこずを忘れないでくださいたあ、これは私たちのこずではありたせん、いいえ。

ファむルをもう䞀床調べお、新たに䞍明瞭になった点を芋おみたしょう。

  • from commons.operators import TelegramBotSendMessage - 独自のオペレヌタヌを䜜成するこずを劚げるものは䜕もありたせん。これを利甚しお、Unblocked にメッセヌゞを送信するための小さなラッパヌを䜜成したした。 (この挔算子に぀いおは埌で詳しく説明したす)。
  • default_args={} - dag は、すべおの挔算子に同じ匕数を配垃できたす。
  • to='{{ var.value.all_the_kings_men }}' - 分野 to ハヌドコヌディングはしたせんが、Jinja ず電子メヌルのリストを含む倉数を䜿甚しお動的に生成したす。これを慎重に入力したした。 Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — オペレヌタヌを開始するための条件。 私たちの堎合、すべおの䟝存関係がうたくいった堎合にのみ、手玙が䞊叞に届きたす。 銖尟よく;
  • tg_bot_conn_id='tg_main' - 匕数 conn_id で䜜成した接続 ID を受け入れたす Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram 内のメッセヌゞは、萜ちたタスクがある堎合にのみ飛び去りたす。
  • task_concurrency=1 - XNUMX ぀のタスクの耇数のタスク むンスタンスを同時に起動するこずは犁止されおいたす。 そうしないず、耇数の補品が同時に発売されるこずになりたす。 VerticaOperator XNUMX぀のテヌブルを芋ながら
  • report_update >> [email, tg] - 党お VerticaOperator 次のような手玙やメッセヌゞの送信に集䞭したす。
    Apache Airflow: ETL を容易にする

    ただし、Notifier オペレヌタヌは起動条件が異なるため、機胜するのは XNUMX ぀だけです。 ツリヌ ビュヌでは、すべおが少し芖芚的に劣っお芋えたす。
    Apache Airflow: ETL を容易にする

に぀いお少しお話したす マクロ そしお圌らの友人たち - 倉数.

マクロは、さたざたな有甚な情報を挔算子の匕数に眮き換えるこずができる Jinja プレヌスホルダヌです。 たずえば、次のようになりたす。

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} コンテキスト倉数の内容に展開されたす execution_date 圢匏で YYYY-MM-DD: 2020-07-14。 最も優れおいる点は、コンテキスト倉数が特定のタスク むンスタンス (ツリヌ ビュヌの四角圢) に固定されおおり、再起動するずプレヌスホルダヌが同じ倀に展開されるこずです。

割り圓おられた倀は、各タスク むンスタンスの [レンダリング] ボタンを䜿甚しお衚瀺できたす。 手玙を送るタスクは次のようになりたす。

Apache Airflow: ETL を容易にする

そしお、メッセヌゞを送信するタスクでも次のようになりたす。

Apache Airflow: ETL を容易にする

利甚可胜な最新バヌゞョンの組み蟌みマクロの完党なリストは、ここから入手できたす。 マクロリファレンス

さらに、プラグむンを䜿甚するず独自のマクロを宣蚀できたすが、それはたた別の話です。

事前定矩されたものに加えお、倉数の倀を眮き換えるこずもできたす (これは䞊蚘のコヌドですでに䜿甚しおいたす)。 で䜜成したしょう Admin/Variables いく぀かのこず:

Apache Airflow: ETL を容易にする

䜿えるものはすべお

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

倀はスカラヌにするこずも、JSON にするこずもできたす。 JSONの堎合:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

目的のキヌぞのパスを䜿甚するだけです。 {{ var.json.bot_config.bot.token }}.

文字通り䞀蚀蚀っお、スクリヌンショットを XNUMX 枚芋せたす。 接続。 ここではすべおが基本的なものです: ペヌゞ䞊 Admin/Connections 接続を䜜成し、そこにログむン/パスワヌドずより具䜓的なパラメヌタを远加したす。 このような

Apache Airflow: ETL を容易にする

パスワヌドは (デフォルトよりも培底的に) 暗号化するこずも、(私が行ったように) 接続タむプを省略するこずもできたす。 tg_main) - 実際のずころ、タむプのリストは Airflow モデルに組み蟌たれおおり、゜ヌス コヌドにアクセスしない限り拡匵するこずはできたせん (突然䜕かをグヌグルで怜玢しなかった堎合は、修正しおください) が、ただそれだけでクレゞットを取埗するこずを劚げるものは䜕もありたせん。名前。

同じ名前で耇数の接続を䜜成するこずもできたす。この堎合、メ゜ッドは BaseHook.get_connection()、名前で接続を取埗したす。 ランダム いく぀かの同名者からの名前です (ラりンドロビンを䜜成する方が論理的ですが、それは Airflow 開発者の良心に任せたしょう)。

倉数ず接続は確かに優れたツヌルですが、フロヌのどの郚分をコヌド自䜓に保存し、どの郚分を保存のために Airflow に枡すかずいうバランスを倱わないこずが重芁です。 䞀方で、UI を通じおメヌル ボックスなどの倀をすばやく倉曎できるず䟿利です。 䞀方で、これは䟝然ずしお、私たち (私) が排陀したかったマりス クリックぞの回垰です。

接続の操䜜もタスクの XNUMX ぀です フック。 䞀般に、Airflow フックは、サヌドパヌティのサヌビスやラむブラリに接続するためのポむントです。 䟋えば、 JiraHook Jira ず察話するためのクラむアントが開きたす (タスクを前埌に移動できたす)。 SambaHook ロヌカルファむルをプッシュできたす smb-点。

カスタム挔算子の解析

そしお、私たちはそれがどのように䜜られおいるかを芋るこずに近づきたした TelegramBotSendMessage

コヌド commons/operators.py 実際の挔算子を䜿甚しお:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

ここでは、Airflow の他のすべおず同様に、すべおが非垞にシンプルです。

  • 継承元 BaseOperatorかなりの数の Airflow 固有の機胜が実装されおいたす (暇なずきに芋おください)
  • 宣蚀されたフィヌルド template_fieldsここで、Jinja は凊理するマクロを探したす。
  • に察する正しい議論を敎理した __init__()、必芁に応じおデフォルトを蚭定したす。
  • 祖先の初期化も忘れおいたせん。
  • 察応するフックを開いた TelegramBotHookそこからクラむアントオブゞェクトを受け取りたした。
  • オヌバヌラむドされた (再定矩された) メ゜ッド BaseOperator.execute()、オペレヌタヌを起動する時間が来るず、Airfowがけいれんしたす - その䞭に、ログむンを忘れおメむンアクションを実装したす。 ちなみに、私たちはログむンしおいたす stdout О stderr - 気流があらゆるものを遮断し、矎しく包み蟌み、必芁に応じお分解したす。)

䜕があるか芋おみたしょう commons/hooks.py。 ファむルの最初の郚分ずフック自䜓は次のずおりです。

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

ここで䜕を説明すればよいのかわかりたせんが、重芁な点だけをメモしおおきたす。

  • 継承し、匕数に぀いお考えたす - ほずんどの堎合、匕数は次の XNUMX ぀になりたす。 conn_id;
  • 暙準的な方法をオヌバヌラむドする: 自分自身に限界を蚭けた get_conn()、ここでは接続パラメヌタを名前で取埗し、セクションを取埗するだけです extra (これは JSON フィヌルドです)。ここに (私自身の指瀺に埓っお!) Telegram ボット トヌクンを配眮したす。 {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • 私たちのむンスタンスを䜜成したす TelegramBot、特定のトヌクンを䞎えたす。

それで党郚です。 次を䜿甚しおフックからクラむアントを取埗できたす TelegramBotHook().clent たたは TelegramBotHook().get_conn().

ファむルの XNUMX 番目の郚分では、同じものをドラッグしないように Telegram REST API のマむクロラッパヌを䜜成したす。 python-telegram-bot XNUMX぀の方法に぀いお sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

正しい方法は、すべおを合蚈するこずです。 TelegramBotSendMessage, TelegramBotHook, TelegramBot - プラグむンをパブリックリポゞトリに眮き、オヌプン゜ヌスに提䟛したす。

これらすべおを調査しおいる間、レポヌトの曎新は正垞に倱敗し、チャネルで゚ラヌ メッセヌゞが送信されたした。 間違っおないか確認しおみたす 

Apache Airflow: ETL を容易にする
私たちの犬の䞭で䜕かが壊れたした それは私たちが期埅しおいたこずではありたせんか その通り

泚ぐ぀もりですか

䜕かを芋逃したような気がしたすか SQL Server から Vertica にデヌタを転送するず玄束しおいたようですが、それを受け取っお本題から逞れおしたいたした、悪党!

この残虐行為は意図的なもので、私はあなたのためにいく぀かの甚語を解読する必芁があっただけです。 これでさらに先ぞ進むこずができたす。

私たちの蚈画は次のずおりでした。

  1. ダグしおください
  2. タスクの生成
  3. すべおがどれほど矎しいかを芋おください
  4. フィルにセッション番号を割り圓おる
  5. SQL Serverからデヌタを取埗する
  6. デヌタを Vertica に入れる
  7. 統蚈の収集

そこで、これをすべお実行できるようにするために、次の内容に小さな远加を加えた​​した。 docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

そこで私たちは次のように提起したす。

  • ホストずしおの Vertica dwh ほずんどのデフォルト蚭定では、
  • SQL Server の XNUMX ぀のむンスタンス、
  • 埌者のデヌタベヌスにデヌタを入力したすいかなる堎合も調べたせん mssql_init.py!)

前回よりも少し耇雑なコマンドを䜿甚しお、すべおの機胜を起動したす。

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

私たちの奇跡のランダマむザヌが生成したアむテムを䜿甚できたす Data Profiling/Ad Hoc Query:

Apache Airflow: ETL を容易にする
重芁なのは、それをアナリストに芋せないこずです

詳しく述べたす ETLセッション したせん。そこではすべおが些现なこずです。ベヌスを䜜成し、その䞭に暙識があり、すべおをコンテキスト マネヌゞャヌでラップしお、次のこずを行いたす。

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

セッション.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

時が来た 私たちのデヌタを収集する XNUMX のテヌブルから。 非垞に気取らない行を䜿甚しおこれを実行しおみたしょう。

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Airflow から入手したフックを利甚しお pymssql-接続
  2. 日付の圢匏の制限をリク゚ストに眮き換えおみたしょう。制限はテンプレヌト ゚ンゞンによっお関数にスロヌされたす。
  3. リク゚ストをフィヌドする pandas誰が私たちを捕たえおくれるのか DataFrame - 将来的には圹に立ちたす。

代替品を䜿甚しおいたす {dt} リク゚ストパラメヌタの代わりに %s 私が悪いピノキオだからではなく、 pandas 察凊できない pymssql そしお最埌のものを滑りたす params: List圌は本圓に望んでいるのに tuple.
開発者にも泚意しおください pymssql もう圌をサポヌトしないこずに決めた、そしお匕っ越しの時が来た pyodbc.

Airflow が関数の匕数に䜕を詰め蟌んだかを芋おみたしょう。

Apache Airflow: ETL を容易にする

デヌタがない堎合は、続行する意味がありたせん。 しかし、充填が成功したず考えるのも奇劙です。 しかし、これは間違いではありたせん。 ああ、ああ、どうすればいいですか そしお、これが次のずおりです。

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow に゚ラヌがないこずを䌝えたすが、タスクはスキップしたす。 むンタヌフェむスには緑や赀の四角圢はなく、ピンク色になりたす。

デヌタを捚おたしょう 耇数の列:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

すなわち

  • 泚文を取埗したデヌタベヌス、
  • フラッディング セッションの ID (これは異なりたす) あらゆるタスクに察しお),
  • ゜ヌスず泚文 ID からのハッシュ - 最終的なデヌタベヌス (すべおが XNUMX ぀のテヌブルに泚がれる) では、䞀意の泚文 ID が埗られたす。

最埌から XNUMX 番目のステップが残っおいたす。すべおを Vertica に泚ぎたす。 そしお、奇劙なこずに、これを行う最も効果的で効果的な方法の XNUMX ぀は CSV を䜿甚するこずです。

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. 専甚の受信機を補䜜䞭です StringIO.
  2. pandas 芪切に私たちのを眮きたす DataFrame の圢で CSV-行。
  3. フックを䜿甚しおお気に入りの Vertica ぞの接続を開いおみたしょう。
  4. そしお今、助けを借りお copy() デヌタを Vertika に盎接送信しおください。

ドラむバヌから䜕行が埋たっおいるかを取埗し、セッション マネヌゞャヌにすべおが正垞であるこずを䌝えたす。

session.loaded_rows = cursor.rowcount
session.successful = True

それだけです。

販売ではタヌゲットプレヌトを手䜜業で䜜成しおおりたす。 ここで私は自分自身に小さなマシンを蚱可したした。

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

䜿っおいたす VerticaOperator() デヌタベヌス スキヌマずテヌブルを䜜成したす (もちろん、ただ存圚しない堎合)。 䞻なこずは、䟝存関係を正しく敎理するこずです。

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

芁玄

- そうですね、 - 小さなネズミは蚀いたした、 - そうじゃないですか、今
私が森の䞭で最も恐ろしい動物だず確信しおいたすか

ゞュリア・ドナルド゜ン「ザ・グラファロヌ」

同僚ず私が競争したずしたら、誰がすぐに ETL プロセスを最初から䜜成しお起動するか、圌らは SSIS ずマりス、私は Airflow を持っおいるず思いたす...そしお、メンテナンスのしやすさも比范するでしょう...わあ、私があらゆる面で圌らを倒すこずにあなたも同意しおくれるず思いたす!

もう少し真剣に考えれば、Apache Airflow はプロセスをプログラム コヌドの圢で蚘述するこずで私の圹割を果たしたした。 ずっず もっず快適に、もっず楜しく。

プラグむンずスケヌラビリティの䞡方の点で、その無制限の拡匵性により、デヌタの収集、準備、凊理の党サむクル、さらには火星ぞのロケットの打ち䞊げなど、ほがあらゆる分野で Airflow を䜿甚する機䌚が埗られたす。コヌス。

最終パヌト、参考資料および情報

私たちがあなたのために集めた熊手

  • start_date。 はい、これはすでにロヌカルミヌムです。 Doug の䞻な議論経由 start_date すべお合栌したす。 簡単に蚀うず、 start_date 珟圚の日付、および schedule_interval - ある日、DAG は明日から始たりたす。
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    もう問題はありたせん。

    これに関連する別の実行時゚ラヌがありたす。 Task is missing the start_date parameterこれは、ほずんどの堎合、dag オペレヌタヌにバむンドするのを忘れたこずを瀺したす。

  • すべお XNUMX 台のマシン䞊で。 はい、ベヌス (Airflow 自䜓ず圓瀟のコヌティング)、Web サヌバヌ、スケゞュヌラヌ、ワヌカヌも含たれたす。 そしおそれはうたくいきたした。 しかし、時間の経過ずずもにサヌビスのタスクの数が増加し、PostgreSQL がむンデックスに 20 ミリ秒ではなく 5 秒で応答し始めたずき、私たちはそれを取り䞊げお持ち去りたした。
  • ロヌカル゚グれキュヌタ。 はい、私たちはただその䞊に座っおいたす、そしお私たちはすでに深淵の端に来おいたす。 これたでは LocalExecutor で十分でしたが、今床は少なくずも XNUMX 人のワヌカヌを远加しお拡匵する必芁があり、CeleryExecutor に移行するために懞呜に努力する必芁がありたす。 そしお、XNUMX 台のマシン䞊で䜜業できるずいう事実を考慮するず、たずえサヌバヌ䞊であっおも Celery を䜿甚するこずを劚げるものは䜕もありたせん。「もちろん、正盎に蚀っお、本番環境に導入されるこずはありたせん!」
  • 䞍䜿甚 組み蟌みツヌル:
    • Connections サヌビス資栌情報を保存するため、
    • SLA ミス 時間内に完了しなかったタスクに察応するため、
    • ゚ックスコム メタデヌタ亀換のため私は蚀いたした メタデヌタ!) DAG タスク間で。
  • メヌル虐埅。 さお、䜕ず蚀えばいいでしょうか 倱敗したタスクのすべおの繰り返しに察しおアラヌトが蚭定されたした。 珟圚、私の職堎の Gmail には Airflow からのメヌルが 90 件を超えおいたすが、Web メヌルの銃口では䞀床に 100 件を超えるメヌルの受信ず削陀が拒吊されおいたす。

さらに萜ずし穎: Apache Airflow の萜ずし穎

その他の自動化ツヌル

私たちが手を䜿わずに頭を䜿っおさらに䜜業できるように、Airflow は以䞋を甚意したした。

  • REST API - 圌はただ実隓段階のステヌタスを保持しおいるため、䜜業が劚げられるこずはありたせん。 これを䜿甚するず、DAG ずタスクに関する情報を取埗できるだけでなく、DAG の停止/開始、DAG 実行たたはプヌルの䜜成もできたす。
  • CLI - 倚くのツヌルはコマンド ラむンから利甚できたすが、WebUI から䜿甚するのが䞍䟿なだけでなく、通垞は存圚したせん。 䟋えば
    • backfill タスクむンスタンスを再起動するために必芁です。
      たずえば、アナリストがやっお来おこう蚀いたした。「そしお同志、あなたは1月13日からXNUMX日たでのデヌタにナンセンスがありたす 盎しお、盎しお、盎しお、盎しお」 そしお、あなたはずおも趣味の良い人です。

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • 基本サヌビス: initdb, resetdb, upgradedb, checkdb.
    • runこれにより、XNUMX ぀のむンスタンス タスクを実行し、すべおの䟝存関係のスコアを取埗するこずもできたす。 さらに、次経由で実行できたす LocalExecutorセロリのクラスタヌがある堎合でも。
    • ほが同じこずをしたす test、ベヌスのみにも䜕も曞き蟌みたせん。
    • connections シェルから接続を倧量に䜜成できたす。
  • Python API - プラグむンを察象ずしたかなりハヌドコアな察話方法であり、小さな手でそれに矀がるこずはありたせん。 しかし、私たちが行くのを誰が止めるのでしょうか /home/airflow/dags、 走る ipython そしおいじり始めたすか たずえば、次のコヌドを䜿甚しおすべおの接続を゚クスポヌトできたす。
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow メタデヌタベヌスに接続したす。 これに曞き蟌むこずはお勧めしたせんが、さたざたな特定のメトリクスのタスク状態を取埗するこずは、API を䜿甚するよりもはるかに高速か぀簡単です。

    すべおのタスクが冪等であるわけではありたせんが、堎合によっおは倱敗する可胜性があり、これは正垞であるずしたす。 ただし、いく぀かの詰たりはすでに疑わしいため、確認する必芁がありたす。

    SQLに泚意しおください!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

リファレンス

そしおもちろん、Google が発行した最初の XNUMX 個のリンクは、私のブックマヌクの Airflow フォルダヌの内容です。

蚘事内で䜿甚されおいるリンク:

出所 habr.com