Apache Airflow:讓 ETL 更簡單

大家好,我是 Dmitry Logvinenko - Vezet 集團公司分析部門的數據工程師。

我將向您介紹一個用於開發 ETL 過程的出色工具 - Apache Airflow。 但是 Airflow 是如此的多才多藝,即使你不涉及數據流,但需要定期啟動任何進程並監控它們的執行,你也應該仔細研究一下它。

是的,我不僅會講述,還會展示:該程序有很多代碼、屏幕截圖和推薦。

Apache Airflow:讓 ETL 更簡單
當你用谷歌搜索 Airflow / Wikimedia Commons 時,你通常會看到什麼

目錄

介紹

Apache Airflow 就像 Django:

  • 用python寫的
  • 有一個很棒的管理面板,
  • 可無限擴展

- 只有更好,它是為完全不同的目的而製作的,即(因為它寫在型之前):

  • 在無限數量的機器上運行和監控任務(盡可能多的 Celery / Kubernetes 和你的良心允許你)
  • 通過非常易於編寫和理解的 Python 代碼生成動態工作流
  • 以及使用現成組件和自製插件將任何數據庫和 API 相互連接的能力(這非常簡單)。

我們像這樣使用 A​​pache Airflow:

  • 我們從 DWH 和 ODS(我們有 Vertica 和 Clickhouse)中的各種來源(許多 SQL Server 和 PostgreSQL 實例、具有應用程序指標的各種 API,甚至 1C)收集數據。
  • 多麼先進 cron,它啟動 ODS 上的數據整合過程,並監控它們的維護。

直到最近,我們的需求都由一台具有 32 個內核和 50 GB RAM 的小型服務器來滿足。 在氣流中,這有效:

  • более 200 達格 (實際上是我們填充任務的工作流程),
  • 在每個平均 70個任務,
  • 這種善良開始(也平均) 每小時一次.

關於我們如何擴展,我將在下面寫,但現在讓我們定義我們將解決的 über 問題:

原來的SQL Server有50台,每台有XNUMX個數據庫——分別是一個項目的實例,它們的結構都是一樣的(幾乎無處不在,mua-ha-ha),也就是說每台都有一個Orders表(還好有一個表名稱可以推入任何業務)。 我們通過添加服務字段(源服務器、源數據庫、ETL 任務 ID)來獲取數據,然後天真地將它們放入 Vertica 中。

我們走吧!

主要部分,實用(和一點理論)

為什麼我們(和你)

當樹很大而我很簡單 SQL-schik 在一家俄羅斯零售店中,我們使用我們可用的兩種工具欺騙了 ETL 過程,也就是數據流:

  • Informatica 電源中心 - 一個非常廣泛的系統,非常有生產力,有自己的硬件,有自己的版本控制。 我使用了上帝禁止其功能的 1%。 為什麼? 嗯,首先,這個界面,從 380 年代的某個地方開始,在精神上給我們施加了壓力。 其次,這個裝置專為極其花哨的流程、激烈的組件重用和其他非常重要的企業技巧而設計。 關於它的成本,例如空中客車 AXNUMX 的機翼/年,我們不會說什麼。

    當心,屏幕截圖可能會傷害 30 歲以下的人

    Apache Airflow:讓 ETL 更簡單

  • SQL Server 集成服務器 - 我們在項目內部流程中使用了這個同志。 好吧,事實上:我們已經在使用 SQL Server,不使用它的 ETL 工具在某種程度上是不合理的。 它裡面的一切都很好:無論是界面漂亮,還是進度報告……但這不是我們喜歡軟件產品的原因,哦,不是為了這個。 版本它 dtsx (它是在保存時打亂節點的 XML)我們可以,但有什麼意義呢? 如何製作一個將數百個表從一台服務器拖到另一台服務器的任務包? 是的,什麼一百,你的食指會從二十塊上掉下來,點擊鼠標按鈕。 但它絕對看起來更時尚:

    Apache Airflow:讓 ETL 更簡單

我們當然在尋找出路。 案連 幾乎 來到一個自己寫的SSIS包生成器...

......然後一份新工作找到了我。 Apache Airflow 在這方面超越了我。

當我發現 ETL 過程描述是簡單的 Python 代碼時,我並沒有高興得手舞足蹈。 這就是數據流版本化和差異化的方式,將具有單一結構的表從數百個數據庫倒入一個目標成為一個半或兩個 13 英寸屏幕中的 Python 代碼的問題。

組裝集群

我們不安排一個完全的幼兒園,這裡不談完全顯而易見的事情,比如安裝 Airflow,你選擇的數據庫,Celery 和 docks 中描述的其他案例。

為了我們可以立即開始實驗,我畫了草圖 docker-compose.yml 其中:

  • 讓我們實際提高 氣流:調度程序,網絡服務器。 Flower 也將在那裡旋轉以監視 Celery 任務(因為它已經被推入 apache/airflow:1.10.10-python3.7,但我們不介意)
  • PostgreSQL的,其中Airflow將寫入其服務信息(調度器數據、執行統計等),Celery將標記已完成的任務;
  • Redis的,它將充當 Celery 的任務代理;
  • 芹菜工人,它將參與直接執行任務。
  • 到文件夾 ./dags 我們將添加帶有 dags 描述的文件。 它們會被即時拾起,因此無需在每次打噴嚏後處理整堆。

在某些地方,示例中的代碼沒有完全顯示(以免文本混亂),但在過程中修改了它。 可以在存儲庫中找到完整的工作代碼示例 https://github.com/dm-logv/airflow-tutorial.

泊塢窗,compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

注:

  • 在構圖的組裝中,我很大程度上依賴於眾所周知的圖像 puckel/docker-氣流 - 一定要檢查一下。 也許你的生活中不需要任何其他東西。
  • 所有 Airflow 設置不僅可以通過 airflow.cfg,而且還通過我惡意利用的環境變量(感謝開發人員)。
  • 自然地,它不是生產就緒的:我故意沒有把心跳放在容器上,我沒有擔心安全問題。 但是我做了最適合我們實驗者的最低限度。
  • 注意:
    • dag 文件夾必須可供調度程序和工作人員訪問。
    • 這同樣適用於所有第三方庫——它們都必須安裝在帶有調度器和工作器的機器上。

好吧,現在很簡單:

$ docker-compose up --scale worker=3

一切都起來後,可以看一下web界面:

基本概念

如果您對所有這些“dags”一無所知,那麼這裡有一本簡短的詞典:

  • 調度 - Airflow 中最重要的大叔,控制機器人努力工作,而不是人:監控日程,更新 dags,啟動任務。

    總的來說,在舊版本中,他有內存問題(不,不是失憶,而是內存洩漏),遺留參數甚至保留在配置中 run_duration — 它的重啟間隔。 但現在一切都很好。

  • DAG (又名“dag”)-“有向無環圖”,但這樣的定義很少有人知道,但實際上它是任務相互交互的容器(見下文)或類似於 SSIS 中的包和 Informatica 中的工作流.

    除了 dags 之外,可能還有 subdags,但我們很可能不會接觸到它們。

  • 有向跑 - 初始化的 dag,它被分配了自己的 execution_date. 同一個 dag 的 Dagrans 可以並行工作(當然,如果你讓你的任務是冪等的)。
  • 操作者 是負責執行特定操作的代碼片段。 運算符分為三種類型:
    • 行動像我們的最愛 PythonOperator,它可以執行任何(有效的)Python 代碼;
    • 轉讓,從一個地方到另一個地方傳輸數據,比如說, MsSqlToHiveTransfer;
    • 傳感器 另一方面,它將允許您做出反應或減慢 dag 的進一步執行,直到事件發生。 HttpSensor 可以拉取指定的端點,等待期望的響應時,開始傳輸 GoogleCloudStorageToS3Operator. 一個好奇的頭腦會問:“為什麼? 畢竟,您可以直接在運算符中進行重複!” 然後,為了不阻塞暫停操作員的任務池。 傳感器啟動、檢查並在下一次嘗試前停止。
  • 任務 - 聲明的運算符,無論類型如何,並附加到 dag 都被提升到任務級別。
  • 任務實例 - 當總體規劃者決定是時候將任務發送到表演者的戰鬥中時(就在現場,如果我們使用 LocalExecutor 或者在以下情況下發送到遠程節點 CeleryExecutor),它為它們分配一個上下文(即一組變量 - 執行參數),擴展命令或查詢模板,並將它們合併。

我們生成任務

首先,讓我們概述一下我們的 doug 的總體方案,然後我們將越來越多地深入細節,因為我們應用了一些非平凡的解決方案。

因此,以最簡單的形式,這樣的 dag 將如下所示:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

讓我們弄清楚:

  • 首先,我們導入必要的庫和 別的東西;
  • sql_server_ds - List[namedtuple[str, str]] 帶有來自 Airflow Connections 的連接名稱和我們將從中獲取盤子的數據庫;
  • dag - 我們的 dag 的公告,必須在 globals(),否則 Airflow 將找不到它。 道格還需要說:
    • 他叫什麼名字 orders - 該名稱將出現在網絡界面中,
    • 他將從 XNUMX 月 XNUMX 日午夜開始工作,
    • 它應該大約每 6 小時運行一次(對於這裡的硬漢而不是 timedelta() 可受理 cron-線 0 0 0/6 ? * * *,對於不太酷的人 - 像這樣的表達 @daily);
  • workflow() 會做主要工作,但不是現在。 現在,我們只是將我們的上下文轉儲到日誌中。
  • 現在是創建任務的簡單魔法:
    • 我們遍歷我們的資源;
    • 初始化 PythonOperator,這將執行我們的虛擬 workflow(). 不要忘記指定任務的唯一(在 dag 內)名稱​​並綁定 dag 本身。 旗幟 provide_context 反過來,會將額外的參數注入函數,我們將使用 **context.

現在,僅此而已。 我們得到了什麼:

  • Web 界面中的新 dag,
  • 將並行執行的一個半百個任務(如果 Airflow、Celery 設置和服務器容量允許的話)。

嗯,差不多明白了。

Apache Airflow:讓 ETL 更簡單
誰來安裝依賴項?

為了簡化整個事情,我搞砸了 docker-compose.yml 加工 requirements.txt 在所有節點上。

現在它不見了:

Apache Airflow:讓 ETL 更簡單

灰色方塊是由調度程序處理的任務實例。

我們稍等一下,任務被工人們搶走了:

Apache Airflow:讓 ETL 更簡單

當然,綠色的已經成功地完成了他們的工作。 紅人不是很成功。

順便說一句,我們的產品上沒有文件夾 ./dags, 機器之間沒有同步 - 所有 dags 都在 git 在我們的 Gitlab 上,Gitlab CI 在合併時將更新分發到機器 master.

關於花的一點

當工人​​們在敲打我們的奶嘴時,讓我們記住另一個可以向我們展示一些東西的工具——花。

第一頁包含有關工作節點的摘要信息:

Apache Airflow:讓 ETL 更簡單

任務最緊張的頁面:

Apache Airflow:讓 ETL 更簡單

我們經紀人狀態最無聊的頁面:

Apache Airflow:讓 ETL 更簡單

最亮的頁面是任務狀態圖及其執行時間:

Apache Airflow:讓 ETL 更簡單

我們加載欠載

所以,所有的任務都完成了,你可以抬走傷員了。

Apache Airflow:讓 ETL 更簡單

並且有很多人受傷 - 出於某種原因。 在正確使用 Airflow 的情況下,這些非常方塊表明數據肯定沒有到達。

您需要查看日誌並重新啟動失敗的任務實例。

通過單擊任何方塊,我們將看到可供我們使用的操作:

Apache Airflow:讓 ETL 更簡單

你可以帶走並清除墮落者。 也就是說,我們忘記了那裡有什麼東西失敗了,同樣的實例任務將轉到調度程序。

Apache Airflow:讓 ETL 更簡單

很明顯,用鼠標和所有紅色方塊來做這件事不是很人性化——這不是我們對 Airflow 的期望。 當然,我們擁有大規模殺傷性武器: Browse/Task Instances

Apache Airflow:讓 ETL 更簡單

讓我們一次選擇所有內容並重置為零,單擊正確的項目:

Apache Airflow:讓 ETL 更簡單

清洗後,我們的出租車是這樣的(它們已經在等待調度器來調度它們了):

Apache Airflow:讓 ETL 更簡單

連接、鉤子和其他變量

是時候看看下一個 DAG 了, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

每個人都做過報告更新嗎? 這又是她:有一個從哪裡獲取數據的來源列表; 有一個清單放在哪裡; 當一切發生或破裂時,不要忘記按喇叭(好吧,這與我們無關,不)。

讓我們再次瀏覽該文件並查看新的晦澀內容:

  • from commons.operators import TelegramBotSendMessage - 沒有什麼能阻止我們製作自己的操作員,我們通過製作一個小包裝器來向 Unblocked 發送消息來利用它。 (我們將在下面詳細討論這個運算符);
  • default_args={} - dag 可以將相同的參數分配給它的所有操作員;
  • to='{{ var.value.all_the_kings_men }}' - 場地 to 我們不會硬編碼,而是使用 Jinja 和一個帶有電子郵件列表的變量動態生成,我小心地輸入了 Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — 啟動操作員的條件。 在我們的例子中,只有在所有依賴關係都解決後,這封信才會飛到老闆那裡 成功地;
  • tg_bot_conn_id='tg_main' - 爭論 conn_id 接受我們創建的連接 ID Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram 中的消息只有在任務掉落時才會飛走;
  • task_concurrency=1 - 我們禁止同時啟動一項任務的多個任務實例。 否則,我們將同時啟動多個 VerticaOperator (看著一張桌子);
  • report_update >> [email, tg] - 全部 VerticaOperator 集中在發送信件和消息上,像這樣:
    Apache Airflow:讓 ETL 更簡單

    但是由於通知運營商有不同的啟動條件,所以只有一個會起作用。 在樹視圖中,一切看起來都不那麼直觀:
    Apache Airflow:讓 ETL 更簡單

我會說幾句話 宏指令 和他們的朋友—— 變數.

宏是 Jinja 佔位符,可以將各種有用信息替換為運算符參數。 例如,像這樣:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} 將擴展到上下文變量的內容 execution_date 格式 YYYY-MM-DD: 2020-07-14. 最好的部分是上下文變量被固定到特定的任務實例(樹視圖中的一個正方形),並且在重新啟動時,佔位符將擴展為相同的值。

可以使用每個任務實例上的 Rendered 按鈕查看分配的值。 這是發送一封信的任務:

Apache Airflow:讓 ETL 更簡單

所以在發送消息的任務中:

Apache Airflow:讓 ETL 更簡單

最新可用版本的內置宏的完整列表可在此處獲得: 宏參考

此外,在插件的幫助下,我們可以聲明自己的宏,但那是另一回事了。

除了預定義的東西,我們還可以替換我們變量的值(我已經在上面的代碼中使用過這個)。 讓我們創造 Admin/Variables 幾件事:

Apache Airflow:讓 ETL 更簡單

你可以使用的一切:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

該值可以是標量,也可以是 JSON。 如果是 JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

只需使用所需密鑰的路徑: {{ var.json.bot_config.bot.token }}.

我會從字面上說一個詞並顯示一張屏幕截圖 連接. 這裡的一切都是基本的:在頁面上 Admin/Connections 我們創建一個連接,在那裡添加我們的登錄名/密碼和更具體的參數。 像這樣:

Apache Airflow:讓 ETL 更簡單

密碼可以加密(比默認的更徹底),或者你可以省略連接類型(就像我為 tg_main) - 事實上,類型列表在 Airflow 模型中是硬連線的,如果不進入源代碼就無法擴展(如果我突然沒有谷歌一些東西,請糾正我),但沒有什麼能阻止我們僅僅通過姓名。

您還可以使用相同的名稱建立多個連接:在這種情況下,方法 BaseHook.get_connection(), 這讓我們通過名字聯繫起來, 會給 隨機的 來自幾個同名的(製作 Round Robin 更合乎邏輯,但讓我們把它留給 Airflow 開發人員的良心)。

變量和連接當然是很酷的工具,但重要的是不要失去平衡:將流的哪些部分存儲在代碼本身中,以及將哪些部分交給 Airflow 進行存儲。 一方面,可以方便地通過 UI 快速更改值,例如郵箱。 另一方面,這仍然是對鼠標點擊的回歸,我們(我)想從中擺脫掉。

使用連接是其中一項任務 掛鉤. 一般來說,Airflow hooks 是連接第三方服務和庫的點。 例如, JiraHook 會打開一個客戶端讓我們和Jira進行交互(可以來回移動任務),並藉助於 SambaHook 您可以將本地文件推送到 smb-觀點。

解析自定義運算符

我們接近了解它是如何製作的 TelegramBotSendMessage

代碼 commons/operators.py 與實際操作員:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

在這裡,就像 Airflow 中的其他所有內容一樣,一切都非常簡單:

  • 繼承自 BaseOperator,它實現了很多特定於 Airflow 的東西(看你的空閒時間)
  • 聲明的字段 template_fields, Jinja 將在其中尋找要處理的宏。
  • 安排了正確的論點 __init__(), 必要時設置默認值。
  • 我們也沒有忘記祖先的初始化。
  • 開啟對應的鉤子 TelegramBotHook從中接收到一個客戶端對象。
  • 覆蓋(重新定義)方法 BaseOperator.execute(),Airfow 將在啟動操作員時抽動 - 我們將在其中執行主要操作,忘記登錄。 (順便說一句,我們登錄 stdout и stderr - Airflow 將攔截所有內容,將其精美包裝,並在必要時分解。)

讓我們看看我們有什麼 commons/hooks.py. 文件的第一部分,帶有鉤子本身:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

我什至不知道在這裡解釋什麼,我只記下重點:

  • 我們繼承,考慮參數 - 在大多數情況下它將是一個: conn_id;
  • 覆蓋標準方法:我限制了自己 get_conn(),其中我按名稱獲取連接參數並僅獲取該部分 extra (這是一個 JSON 字段),我(根據我自己的說明!)在其中放置了 Telegram 機器人令牌: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • 我創建了我們的實例 TelegramBot,給它一個特定的標記。

就這樣。 您可以使用 TelegramBotHook().clentTelegramBotHook().get_conn().

文件的第二部分,我在其中為 Telegram REST API 製作了一個微包裝器,以免拖拽相同的 python-telegram-bot 對於一種方法 sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

正確的方法是全部加起來: TelegramBotSendMessage, TelegramBotHook, TelegramBot - 在插件中,放入公共存儲庫,並將其提供給 Open Source。

當我們正在研究這一切時,我們的報告更新成功失敗並在頻道中向我發送錯誤消息。 我去看看有沒有錯...

Apache Airflow:讓 ETL 更簡單
我們的總督有什麼東西壞了! 這不正是我們所期待的嗎? 確切地!

你要倒嗎?

你覺得我錯過了什麼嗎? 看來他答應把數據從 SQL Server 轉移到 Vertica,然後他就把它拿走了,扯遠了,這個無賴!

這種暴行是故意的,我只需要為你破譯一些術語。 現在你可以走得更遠了。

我們的計劃是這樣的:

  1. 做 dag
  2. 生成任務
  3. 看看一切是多麼美好
  4. 將會話編號分配給填充
  5. 從 SQL Server 獲取數據
  6. 將數據放入 Vertica
  7. 收集統計數據

所以,為了讓這一切都啟動並運行,我對我們的 docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

我們提出:

  • Vertica 作為主機 dwh 使用大多數默認設置,
  • SQL Server 的三個實例,
  • 我們用一些數據填充後者的數據庫(在任何情況下都不要查看 mssql_init.py!)

我們藉助比上次稍微複雜的命令來啟動所有功能:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

我們的奇蹟隨機發生器生成了什麼,您可以使用該項目 Data Profiling/Ad Hoc Query:

Apache Airflow:讓 ETL 更簡單
最主要的是不要給分析師看

詳細說明 ETL 會話 我不會,那裡的一切都是微不足道的:我們做一個基礎,裡面有一個標誌,我們用上下文管理器包裝所有東西,現在我們這樣做:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

會話.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

時機已到 收集我們的數據 從我們的一百五十張桌子。 讓我們藉助非常樸實無華的線條來做到這一點:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. 在鉤子的幫助下,我們從 Airflow 獲得 pymssql-連接
  2. 讓我們將日期形式的限制替換到請求中——它將被模板引擎拋入函數中。
  3. 滿足我們的要求 pandas誰會得到我們 DataFrame - 將來對我們有用。

我正在使用替換 {dt} 而不是請求參數 %s 不是因為我是邪惡的匹諾曹,而是因為 pandas 無法處理 pymssql 最後一個 params: List雖然他很想 tuple.
另請注意,開發者 pymssql 決定不再支持他了,是時候搬出去了 pyodbc.

讓我們看看 Airflow 用什麼填充了我們函數的參數:

Apache Airflow:讓 ETL 更簡單

如果沒有數據,那麼繼續下去就沒有意義了。 但認為填充成功也很奇怪。 但這不是錯誤。 啊啊啊,怎麼辦?! 這是什麼:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException 告訴 Airflow 沒有錯誤,但我們跳過了任務。 界面不會有綠色或紅色方塊,而是粉紅色。

折騰一下我們的數據 多列:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

亦即

  • 我們從中獲取訂單的數據庫,
  • 我們的泛洪會話的 ID(它會有所不同 對於每個任務),
  • 來自源和訂單 ID 的哈希值——這樣在最終數據庫中(所有內容都被倒入一個表中)我們有一個唯一的訂單 ID。

倒數第二步仍然存在:將所有內容倒入 Vertica。 而且,奇怪的是,最引人注目和最有效的方法之一是通過 CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. 我們正在製作一個特殊的接收器 StringIO.
  2. pandas 請將我們的 DataFrameCSV-線。
  3. 讓我們用一個鉤子打開一個到我們最喜歡的 Vertica 的連接。
  4. 現在有了幫助 copy() 將我們的數據直接發送到 Vertika!

我們將從驅動程序中獲取已填滿的行數,並告訴會話管理器一切正常:

session.loaded_rows = cursor.rowcount
session.successful = True

這就是全部。

在銷售中,我們手動創建目標板。 在這裡,我允許自己使用一台小型機器:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

我在用 VerticaOperator() 我創建了一個數據庫模式和一個表(當然,如果它們不存在的話)。 最主要的是正確安排依賴關係:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

總結

- 好吧, - 小老鼠說, - 是嗎,現在
你確信我是森林裡最可怕的動物嗎?

朱莉婭·唐納森,咕嚕牛

我想如果我的同事和我有一個競爭:誰能從頭開始快速創建和啟動一個 ETL 過程:他們用他們的 SSIS 和鼠標,我用 Airflow……然後我們也會比較維護的難易程度……哇,我想你會同意我會在所有方面打敗他們!

如果更認真一點,那麼 Apache Airflow - 通過以程序代碼的形式描述流程 - 完成了我的工作 戈拉澤多 更舒適和愉快。

它的無限可擴展性,無論是在插件還是可擴展性傾向方面,都讓您有機會在幾乎任何領域使用 Airflow:甚至在收集、準備和處理數據的整個週期中,甚至在發射火箭(到火星、課程)。

最終部分,參考和信息

我們為您收集的耙子

  • start_date. 是的,這已經是當地的模因了。 通過道格的主要論點 start_date 全部通過。 簡而言之,如果您指定 start_date 當前日期,和 schedule_interval - 有一天,DAG 將在明天開始,不會更早。
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    沒有更多的問題。

    還有另一個與之相關的運行時錯誤: Task is missing the start_date parameter,這通常表示您忘記綁定到 dag 運算符。

  • 全部在一台機器上。 是的,還有基礎(Airflow 本身和我們的塗層)、網絡服務器、調度程序和工作人員。 它甚至奏效了。 但是隨著時間的推移,服務的任務越來越多,當 PostgreSQL 開始在 20 秒而不是 5 毫秒響應索引時,我們就把它帶走了。
  • 本地執行器。 是的,我們還在上面坐以待斃,我們已經走到了萬丈深淵的邊緣。 到目前為止,LocalExecutor 對我們來說已經足夠了,但現在是時候擴展至少一個 worker 了,我們將不得不努力工作才能遷移到 CeleryExecutor。 鑑於你可以在一台機器上使用它,沒有什麼能阻止你甚至在服務器上使用 Celery,“當然,老實說,它永遠不會投入生產!”
  • 不使用 內置工具:
    • 連接 存儲服務憑證,
    • SLA 未命中 回應未能按時完成的任務,
    • xcom 用於元數據交換(我說 數據!)在 dag 任務之間。
  • 郵件濫用。 好吧,我能說什麼呢? 為失敗任務的所有重複設置了警報。 現在我的工作 Gmail 有超過 90 萬封來自 Airflow 的電子郵件,而網絡郵件槍口一次拒絕接收和刪除超過 100 封。

更多陷阱: Apache Airflow 陷阱

更多自動化工具

為了讓我們更多地用腦而不是用手來工作,Airflow 為我們準備了這個:

  • REST API - 他仍然處於實驗狀態,這並不妨礙他工作。 有了它,您不僅可以獲取有關 dag 和任務的信息,還可以停止/啟動 dag、創建 DAG 運行或池。
  • CLI的 - 許多工具可通過命令行使用,這些工具不僅不方便通過 WebUI 使用,而且通常不存在。 例如:
    • backfill 需要重新啟動任務實例。
      比如分析員來了說:“還有你這個戰友,1月13日到XNUMX日的數據胡說八道! 修復它,修復它,修復它,修復它!” 而你就是這樣一個滾刀:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • 基礎服務: initdb, resetdb, upgradedb, checkdb.
    • run,它允許您運行一個實例任務,甚至可以對所有依賴項進行評分。 此外,您可以通過運行它 LocalExecutor,即使你有一個 Celery 集群。
    • 做幾乎相同的事情 test,也只有在基地什麼都不寫。
    • connections 允許從 shell 大量創建連接。
  • Python API - 一種相當硬核的交互方式,專為插件而設計,而不是用小手蜂擁而至。 但誰能阻止我們去 /home/airflow/dags, 跑步 ipython 開始胡鬧? 例如,您可以使用以下代碼導出所有連接:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • 連接到 Airflow 元數據庫。 我不建議寫入它,但獲取各種特定指標的任務狀態比使用任何 API 都更快、更容易。

    假設並非我們所有的任務都是冪等的,但它們有時會掉線,這是正常的。 但是一些堵塞已經很可疑了,有必要檢查一下。

    當心SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

引用

當然,Google 發布的前十個鏈接是我書籤中 Airflow 文件夾中的內容。

以及文章中使用的鏈接:

來源: www.habr.com