大家好,我是 Dmitry Logvinenko - Vezet 集團公司分析部門的數據工程師。
我將向您介紹一個用於開發 ETL 過程的出色工具 - Apache Airflow。 但是 Airflow 是如此的多才多藝,即使你不涉及數據流,但需要定期啟動任何進程並監控它們的執行,你也應該仔細研究一下它。
是的,我不僅會講述,還會展示:該程序有很多代碼、屏幕截圖和推薦。
當你用谷歌搜索 Airflow / Wikimedia Commons 時,你通常會看到什麼
目錄
介紹
Apache Airflow 就像 Django:
- 用python寫的
- 有一個很棒的管理面板,
- 可無限擴展
- 只有更好,它是為完全不同的目的而製作的,即(因為它寫在型之前):
- 在無限數量的機器上運行和監控任務(盡可能多的 Celery / Kubernetes 和你的良心允許你)
- 通過非常易於編寫和理解的 Python 代碼生成動態工作流
- 以及使用現成組件和自製插件將任何數據庫和 API 相互連接的能力(這非常簡單)。
我們像這樣使用 Apache Airflow:
- 我們從 DWH 和 ODS(我們有 Vertica 和 Clickhouse)中的各種來源(許多 SQL Server 和 PostgreSQL 實例、具有應用程序指標的各種 API,甚至 1C)收集數據。
- 多麼先進
cron
,它啟動 ODS 上的數據整合過程,並監控它們的維護。
直到最近,我們的需求都由一台具有 32 個內核和 50 GB RAM 的小型服務器來滿足。 在氣流中,這有效:
- более 200 達格 (實際上是我們填充任務的工作流程),
- 在每個平均 70個任務,
- 這種善良開始(也平均) 每小時一次.
關於我們如何擴展,我將在下面寫,但現在讓我們定義我們將解決的 über 問題:
原來的SQL Server有50台,每台有XNUMX個數據庫——分別是一個項目的實例,它們的結構都是一樣的(幾乎無處不在,mua-ha-ha),也就是說每台都有一個Orders表(還好有一個表名稱可以推入任何業務)。 我們通過添加服務字段(源服務器、源數據庫、ETL 任務 ID)來獲取數據,然後天真地將它們放入 Vertica 中。
我們走吧!
主要部分,實用(和一點理論)
為什麼我們(和你)
當樹很大而我很簡單 SQL
-schik 在一家俄羅斯零售店中,我們使用我們可用的兩種工具欺騙了 ETL 過程,也就是數據流:
- Informatica 電源中心 - 一個非常廣泛的系統,非常有生產力,有自己的硬件,有自己的版本控制。 我使用了上帝禁止其功能的 1%。 為什麼? 嗯,首先,這個界面,從 380 年代的某個地方開始,在精神上給我們施加了壓力。 其次,這個裝置專為極其花哨的流程、激烈的組件重用和其他非常重要的企業技巧而設計。 關於它的成本,例如空中客車 AXNUMX 的機翼/年,我們不會說什麼。
當心,屏幕截圖可能會傷害 30 歲以下的人
- SQL Server 集成服務器 - 我們在項目內部流程中使用了這個同志。 好吧,事實上:我們已經在使用 SQL Server,不使用它的 ETL 工具在某種程度上是不合理的。 它裡面的一切都很好:無論是界面漂亮,還是進度報告……但這不是我們喜歡軟件產品的原因,哦,不是為了這個。 版本它
dtsx
(它是在保存時打亂節點的 XML)我們可以,但有什麼意義呢? 如何製作一個將數百個表從一台服務器拖到另一台服務器的任務包? 是的,什麼一百,你的食指會從二十塊上掉下來,點擊鼠標按鈕。 但它絕對看起來更時尚:
我們當然在尋找出路。 案連 幾乎 來到一個自己寫的SSIS包生成器...
......然後一份新工作找到了我。 Apache Airflow 在這方面超越了我。
當我發現 ETL 過程描述是簡單的 Python 代碼時,我並沒有高興得手舞足蹈。 這就是數據流版本化和差異化的方式,將具有單一結構的表從數百個數據庫倒入一個目標成為一個半或兩個 13 英寸屏幕中的 Python 代碼的問題。
組裝集群
我們不安排一個完全的幼兒園,這裡不談完全顯而易見的事情,比如安裝 Airflow,你選擇的數據庫,Celery 和 docks 中描述的其他案例。
為了我們可以立即開始實驗,我畫了草圖 docker-compose.yml
其中:
- 讓我們實際提高 氣流:調度程序,網絡服務器。 Flower 也將在那裡旋轉以監視 Celery 任務(因為它已經被推入
apache/airflow:1.10.10-python3.7
,但我們不介意) - PostgreSQL的,其中Airflow將寫入其服務信息(調度器數據、執行統計等),Celery將標記已完成的任務;
- Redis的,它將充當 Celery 的任務代理;
- 芹菜工人,它將參與直接執行任務。
- 到文件夾
./dags
我們將添加帶有 dags 描述的文件。 它們會被即時拾起,因此無需在每次打噴嚏後處理整堆。
在某些地方,示例中的代碼沒有完全顯示(以免文本混亂),但在過程中修改了它。 可以在存儲庫中找到完整的工作代碼示例
https://github.com/dm-logv/airflow-tutorial .
泊塢窗,compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
注:
- 在構圖的組裝中,我很大程度上依賴於眾所周知的圖像
puckel/docker-氣流 - 一定要檢查一下。 也許你的生活中不需要任何其他東西。 - 所有 Airflow 設置不僅可以通過
airflow.cfg
,而且還通過我惡意利用的環境變量(感謝開發人員)。 - 自然地,它不是生產就緒的:我故意沒有把心跳放在容器上,我沒有擔心安全問題。 但是我做了最適合我們實驗者的最低限度。
- 注意:
- dag 文件夾必須可供調度程序和工作人員訪問。
- 這同樣適用於所有第三方庫——它們都必須安裝在帶有調度器和工作器的機器上。
好吧,現在很簡單:
$ docker-compose up --scale worker=3
一切都起來後,可以看一下web界面:
基本概念
如果您對所有這些“dags”一無所知,那麼這裡有一本簡短的詞典:
- 調度 - Airflow 中最重要的大叔,控制機器人努力工作,而不是人:監控日程,更新 dags,啟動任務。
總的來說,在舊版本中,他有內存問題(不,不是失憶,而是內存洩漏),遺留參數甚至保留在配置中
run_duration
— 它的重啟間隔。 但現在一切都很好。 - DAG (又名“dag”)-“有向無環圖”,但這樣的定義很少有人知道,但實際上它是任務相互交互的容器(見下文)或類似於 SSIS 中的包和 Informatica 中的工作流.
除了 dags 之外,可能還有 subdags,但我們很可能不會接觸到它們。
- 有向跑 - 初始化的 dag,它被分配了自己的
execution_date
. 同一個 dag 的 Dagrans 可以並行工作(當然,如果你讓你的任務是冪等的)。 - 操作者 是負責執行特定操作的代碼片段。 運算符分為三種類型:
- 行動像我們的最愛
PythonOperator
,它可以執行任何(有效的)Python 代碼; - 轉讓,從一個地方到另一個地方傳輸數據,比如說,
MsSqlToHiveTransfer
; - 傳感器 另一方面,它將允許您做出反應或減慢 dag 的進一步執行,直到事件發生。
HttpSensor
可以拉取指定的端點,等待期望的響應時,開始傳輸GoogleCloudStorageToS3Operator
. 一個好奇的頭腦會問:“為什麼? 畢竟,您可以直接在運算符中進行重複!” 然後,為了不阻塞暫停操作員的任務池。 傳感器啟動、檢查並在下一次嘗試前停止。
- 行動像我們的最愛
- 任務 - 聲明的運算符,無論類型如何,並附加到 dag 都被提升到任務級別。
- 任務實例 - 當總體規劃者決定是時候將任務發送到表演者的戰鬥中時(就在現場,如果我們使用
LocalExecutor
或者在以下情況下發送到遠程節點CeleryExecutor
),它為它們分配一個上下文(即一組變量 - 執行參數),擴展命令或查詢模板,並將它們合併。
我們生成任務
首先,讓我們概述一下我們的 doug 的總體方案,然後我們將越來越多地深入細節,因為我們應用了一些非平凡的解決方案。
因此,以最簡單的形式,這樣的 dag 將如下所示:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
讓我們弄清楚:
- 首先,我們導入必要的庫和 別的東西;
sql_server_ds
-List[namedtuple[str, str]]
帶有來自 Airflow Connections 的連接名稱和我們將從中獲取盤子的數據庫;dag
- 我們的 dag 的公告,必須在globals()
,否則 Airflow 將找不到它。 道格還需要說:- 他叫什麼名字
orders
- 該名稱將出現在網絡界面中, - 他將從 XNUMX 月 XNUMX 日午夜開始工作,
- 它應該大約每 6 小時運行一次(對於這裡的硬漢而不是
timedelta()
可受理cron
-線0 0 0/6 ? * * *
,對於不太酷的人 - 像這樣的表達@daily
);
- 他叫什麼名字
workflow()
會做主要工作,但不是現在。 現在,我們只是將我們的上下文轉儲到日誌中。- 現在是創建任務的簡單魔法:
- 我們遍歷我們的資源;
- 初始化
PythonOperator
,這將執行我們的虛擬workflow()
. 不要忘記指定任務的唯一(在 dag 內)名稱並綁定 dag 本身。 旗幟provide_context
反過來,會將額外的參數注入函數,我們將使用**context
.
現在,僅此而已。 我們得到了什麼:
- Web 界面中的新 dag,
- 將並行執行的一個半百個任務(如果 Airflow、Celery 設置和服務器容量允許的話)。
嗯,差不多明白了。
誰來安裝依賴項?
為了簡化整個事情,我搞砸了 docker-compose.yml
加工 requirements.txt
在所有節點上。
現在它不見了:
灰色方塊是由調度程序處理的任務實例。
我們稍等一下,任務被工人們搶走了:
當然,綠色的已經成功地完成了他們的工作。 紅人不是很成功。
順便說一句,我們的產品上沒有文件夾
./dags
, 機器之間沒有同步 - 所有 dags 都在git
在我們的 Gitlab 上,Gitlab CI 在合併時將更新分發到機器master
.
關於花的一點
當工人們在敲打我們的奶嘴時,讓我們記住另一個可以向我們展示一些東西的工具——花。
第一頁包含有關工作節點的摘要信息:
任務最緊張的頁面:
我們經紀人狀態最無聊的頁面:
最亮的頁面是任務狀態圖及其執行時間:
我們加載欠載
所以,所有的任務都完成了,你可以抬走傷員了。
並且有很多人受傷 - 出於某種原因。 在正確使用 Airflow 的情況下,這些非常方塊表明數據肯定沒有到達。
您需要查看日誌並重新啟動失敗的任務實例。
通過單擊任何方塊,我們將看到可供我們使用的操作:
你可以帶走並清除墮落者。 也就是說,我們忘記了那裡有什麼東西失敗了,同樣的實例任務將轉到調度程序。
很明顯,用鼠標和所有紅色方塊來做這件事不是很人性化——這不是我們對 Airflow 的期望。 當然,我們擁有大規模殺傷性武器: Browse/Task Instances
讓我們一次選擇所有內容並重置為零,單擊正確的項目:
清洗後,我們的出租車是這樣的(它們已經在等待調度器來調度它們了):
連接、鉤子和其他變量
是時候看看下一個 DAG 了, update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
每個人都做過報告更新嗎? 這又是她:有一個從哪裡獲取數據的來源列表; 有一個清單放在哪裡; 當一切發生或破裂時,不要忘記按喇叭(好吧,這與我們無關,不)。
讓我們再次瀏覽該文件並查看新的晦澀內容:
from commons.operators import TelegramBotSendMessage
- 沒有什麼能阻止我們製作自己的操作員,我們通過製作一個小包裝器來向 Unblocked 發送消息來利用它。 (我們將在下面詳細討論這個運算符);default_args={}
- dag 可以將相同的參數分配給它的所有操作員;to='{{ var.value.all_the_kings_men }}'
- 場地to
我們不會硬編碼,而是使用 Jinja 和一個帶有電子郵件列表的變量動態生成,我小心地輸入了Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
— 啟動操作員的條件。 在我們的例子中,只有在所有依賴關係都解決後,這封信才會飛到老闆那裡 成功地;tg_bot_conn_id='tg_main'
- 爭論conn_id
接受我們創建的連接 IDAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- Telegram 中的消息只有在任務掉落時才會飛走;task_concurrency=1
- 我們禁止同時啟動一項任務的多個任務實例。 否則,我們將同時啟動多個VerticaOperator
(看著一張桌子);report_update >> [email, tg]
- 全部VerticaOperator
集中在發送信件和消息上,像這樣:
但是由於通知運營商有不同的啟動條件,所以只有一個會起作用。 在樹視圖中,一切看起來都不那麼直觀:
我會說幾句話 宏指令 和他們的朋友—— 變數.
宏是 Jinja 佔位符,可以將各種有用信息替換為運算符參數。 例如,像這樣:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
將擴展到上下文變量的內容 execution_date
格式 YYYY-MM-DD
: 2020-07-14
. 最好的部分是上下文變量被固定到特定的任務實例(樹視圖中的一個正方形),並且在重新啟動時,佔位符將擴展為相同的值。
可以使用每個任務實例上的 Rendered 按鈕查看分配的值。 這是發送一封信的任務:
所以在發送消息的任務中:
最新可用版本的內置宏的完整列表可在此處獲得:
此外,在插件的幫助下,我們可以聲明自己的宏,但那是另一回事了。
除了預定義的東西,我們還可以替換我們變量的值(我已經在上面的代碼中使用過這個)。 讓我們創造 Admin/Variables
幾件事:
你可以使用的一切:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
該值可以是標量,也可以是 JSON。 如果是 JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
只需使用所需密鑰的路徑: {{ var.json.bot_config.bot.token }}
.
我會從字面上說一個詞並顯示一張屏幕截圖 連接. 這裡的一切都是基本的:在頁面上 Admin/Connections
我們創建一個連接,在那裡添加我們的登錄名/密碼和更具體的參數。 像這樣:
密碼可以加密(比默認的更徹底),或者你可以省略連接類型(就像我為 tg_main
) - 事實上,類型列表在 Airflow 模型中是硬連線的,如果不進入源代碼就無法擴展(如果我突然沒有谷歌一些東西,請糾正我),但沒有什麼能阻止我們僅僅通過姓名。
您還可以使用相同的名稱建立多個連接:在這種情況下,方法 BaseHook.get_connection()
, 這讓我們通過名字聯繫起來, 會給 隨機的 來自幾個同名的(製作 Round Robin 更合乎邏輯,但讓我們把它留給 Airflow 開發人員的良心)。
變量和連接當然是很酷的工具,但重要的是不要失去平衡:將流的哪些部分存儲在代碼本身中,以及將哪些部分交給 Airflow 進行存儲。 一方面,可以方便地通過 UI 快速更改值,例如郵箱。 另一方面,這仍然是對鼠標點擊的回歸,我們(我)想從中擺脫掉。
使用連接是其中一項任務 掛鉤. 一般來說,Airflow hooks 是連接第三方服務和庫的點。 例如, JiraHook
會打開一個客戶端讓我們和Jira進行交互(可以來回移動任務),並藉助於 SambaHook
您可以將本地文件推送到 smb
-觀點。
解析自定義運算符
我們接近了解它是如何製作的 TelegramBotSendMessage
代碼 commons/operators.py
與實際操作員:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
在這裡,就像 Airflow 中的其他所有內容一樣,一切都非常簡單:
- 繼承自
BaseOperator
,它實現了很多特定於 Airflow 的東西(看你的空閒時間) - 聲明的字段
template_fields
, Jinja 將在其中尋找要處理的宏。 - 安排了正確的論點
__init__()
, 必要時設置默認值。 - 我們也沒有忘記祖先的初始化。
- 開啟對應的鉤子
TelegramBotHook
從中接收到一個客戶端對象。 - 覆蓋(重新定義)方法
BaseOperator.execute()
,Airfow 將在啟動操作員時抽動 - 我們將在其中執行主要操作,忘記登錄。 (順便說一句,我們登錄stdout
иstderr
- Airflow 將攔截所有內容,將其精美包裝,並在必要時分解。)
讓我們看看我們有什麼 commons/hooks.py
. 文件的第一部分,帶有鉤子本身:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
我什至不知道在這裡解釋什麼,我只記下重點:
- 我們繼承,考慮參數 - 在大多數情況下它將是一個:
conn_id
; - 覆蓋標準方法:我限制了自己
get_conn()
,其中我按名稱獲取連接參數並僅獲取該部分extra
(這是一個 JSON 字段),我(根據我自己的說明!)在其中放置了 Telegram 機器人令牌:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - 我創建了我們的實例
TelegramBot
,給它一個特定的標記。
就這樣。 您可以使用 TelegramBotHook().clent
或 TelegramBotHook().get_conn()
.
文件的第二部分,我在其中為 Telegram REST API 製作了一個微包裝器,以免拖拽相同的 python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
正確的方法是全部加起來:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- 在插件中,放入公共存儲庫,並將其提供給 Open Source。
當我們正在研究這一切時,我們的報告更新成功失敗並在頻道中向我發送錯誤消息。 我去看看有沒有錯...
我們的總督有什麼東西壞了! 這不正是我們所期待的嗎? 確切地!
你要倒嗎?
你覺得我錯過了什麼嗎? 看來他答應把數據從 SQL Server 轉移到 Vertica,然後他就把它拿走了,扯遠了,這個無賴!
這種暴行是故意的,我只需要為你破譯一些術語。 現在你可以走得更遠了。
我們的計劃是這樣的:
- 做 dag
- 生成任務
- 看看一切是多麼美好
- 將會話編號分配給填充
- 從 SQL Server 獲取數據
- 將數據放入 Vertica
- 收集統計數據
所以,為了讓這一切都啟動並運行,我對我們的 docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
我們提出:
- Vertica 作為主機
dwh
使用大多數默認設置, - SQL Server 的三個實例,
- 我們用一些數據填充後者的數據庫(在任何情況下都不要查看
mssql_init.py
!)
我們藉助比上次稍微複雜的命令來啟動所有功能:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
我們的奇蹟隨機發生器生成了什麼,您可以使用該項目 Data Profiling/Ad Hoc Query
:
最主要的是不要給分析師看
詳細說明 ETL 會話 我不會,那裡的一切都是微不足道的:我們做一個基礎,裡面有一個標誌,我們用上下文管理器包裝所有東西,現在我們這樣做:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
會話.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
時機已到 收集我們的數據 從我們的一百五十張桌子。 讓我們藉助非常樸實無華的線條來做到這一點:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- 在鉤子的幫助下,我們從 Airflow 獲得
pymssql
-連接 - 讓我們將日期形式的限制替換到請求中——它將被模板引擎拋入函數中。
- 滿足我們的要求
pandas
誰會得到我們DataFrame
- 將來對我們有用。
我正在使用替換
{dt}
而不是請求參數%s
不是因為我是邪惡的匹諾曹,而是因為pandas
無法處理pymssql
最後一個params: List
雖然他很想tuple
.
另請注意,開發者pymssql
決定不再支持他了,是時候搬出去了pyodbc
.
讓我們看看 Airflow 用什麼填充了我們函數的參數:
如果沒有數據,那麼繼續下去就沒有意義了。 但認為填充成功也很奇怪。 但這不是錯誤。 啊啊啊,怎麼辦?! 這是什麼:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
告訴 Airflow 沒有錯誤,但我們跳過了任務。 界面不會有綠色或紅色方塊,而是粉紅色。
折騰一下我們的數據 多列:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
亦即
- 我們從中獲取訂單的數據庫,
- 我們的泛洪會話的 ID(它會有所不同 對於每個任務),
- 來自源和訂單 ID 的哈希值——這樣在最終數據庫中(所有內容都被倒入一個表中)我們有一個唯一的訂單 ID。
倒數第二步仍然存在:將所有內容倒入 Vertica。 而且,奇怪的是,最引人注目和最有效的方法之一是通過 CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- 我們正在製作一個特殊的接收器
StringIO
. pandas
請將我們的DataFrame
如CSV
-線。- 讓我們用一個鉤子打開一個到我們最喜歡的 Vertica 的連接。
- 現在有了幫助
copy()
將我們的數據直接發送到 Vertika!
我們將從驅動程序中獲取已填滿的行數,並告訴會話管理器一切正常:
session.loaded_rows = cursor.rowcount
session.successful = True
這就是全部。
在銷售中,我們手動創建目標板。 在這裡,我允許自己使用一台小型機器:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
我在用
VerticaOperator()
我創建了一個數據庫模式和一個表(當然,如果它們不存在的話)。 最主要的是正確安排依賴關係:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
總結
- 好吧, - 小老鼠說, - 是嗎,現在
你確信我是森林裡最可怕的動物嗎?
朱莉婭·唐納森,咕嚕牛
我想如果我的同事和我有一個競爭:誰能從頭開始快速創建和啟動一個 ETL 過程:他們用他們的 SSIS 和鼠標,我用 Airflow……然後我們也會比較維護的難易程度……哇,我想你會同意我會在所有方面打敗他們!
如果更認真一點,那麼 Apache Airflow - 通過以程序代碼的形式描述流程 - 完成了我的工作 戈拉澤多 更舒適和愉快。
它的無限可擴展性,無論是在插件還是可擴展性傾向方面,都讓您有機會在幾乎任何領域使用 Airflow:甚至在收集、準備和處理數據的整個週期中,甚至在發射火箭(到火星、課程)。
最終部分,參考和信息
我們為您收集的耙子
start_date
. 是的,這已經是當地的模因了。 通過道格的主要論點start_date
全部通過。 簡而言之,如果您指定start_date
當前日期,和schedule_interval
- 有一天,DAG 將在明天開始,不會更早。start_date = datetime(2020, 7, 7, 0, 1, 2)
沒有更多的問題。
還有另一個與之相關的運行時錯誤:
Task is missing the start_date parameter
,這通常表示您忘記綁定到 dag 運算符。- 全部在一台機器上。 是的,還有基礎(Airflow 本身和我們的塗層)、網絡服務器、調度程序和工作人員。 它甚至奏效了。 但是隨著時間的推移,服務的任務越來越多,當 PostgreSQL 開始在 20 秒而不是 5 毫秒響應索引時,我們就把它帶走了。
- 本地執行器。 是的,我們還在上面坐以待斃,我們已經走到了萬丈深淵的邊緣。 到目前為止,LocalExecutor 對我們來說已經足夠了,但現在是時候擴展至少一個 worker 了,我們將不得不努力工作才能遷移到 CeleryExecutor。 鑑於你可以在一台機器上使用它,沒有什麼能阻止你甚至在服務器上使用 Celery,“當然,老實說,它永遠不會投入生產!”
- 不使用 內置工具:
- 連接 存儲服務憑證,
- SLA 未命中 回應未能按時完成的任務,
- xcom 用於元數據交換(我說 元數據!)在 dag 任務之間。
- 郵件濫用。 好吧,我能說什麼呢? 為失敗任務的所有重複設置了警報。 現在我的工作 Gmail 有超過 90 萬封來自 Airflow 的電子郵件,而網絡郵件槍口一次拒絕接收和刪除超過 100 封。
更多陷阱:
Apache Airflow 陷阱
更多自動化工具
為了讓我們更多地用腦而不是用手來工作,Airflow 為我們準備了這個:
REST API - 他仍然處於實驗狀態,這並不妨礙他工作。 有了它,您不僅可以獲取有關 dag 和任務的信息,還可以停止/啟動 dag、創建 DAG 運行或池。CLI的 - 許多工具可通過命令行使用,這些工具不僅不方便通過 WebUI 使用,而且通常不存在。 例如:backfill
需要重新啟動任務實例。
比如分析員來了說:“還有你這個戰友,1月13日到XNUMX日的數據胡說八道! 修復它,修復它,修復它,修復它!” 而你就是這樣一個滾刀:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- 基礎服務:
initdb
,resetdb
,upgradedb
,checkdb
. run
,它允許您運行一個實例任務,甚至可以對所有依賴項進行評分。 此外,您可以通過運行它LocalExecutor
,即使你有一個 Celery 集群。- 做幾乎相同的事情
test
,也只有在基地什麼都不寫。 connections
允許從 shell 大量創建連接。
Python API - 一種相當硬核的交互方式,專為插件而設計,而不是用小手蜂擁而至。 但誰能阻止我們去/home/airflow/dags
, 跑步ipython
開始胡鬧? 例如,您可以使用以下代碼導出所有連接:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- 連接到 Airflow 元數據庫。 我不建議寫入它,但獲取各種特定指標的任務狀態比使用任何 API 都更快、更容易。
假設並非我們所有的任務都是冪等的,但它們有時會掉線,這是正常的。 但是一些堵塞已經很可疑了,有必要檢查一下。
當心SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
引用
當然,Google 發布的前十個鏈接是我書籤中 Airflow 文件夾中的內容。
Apache 氣流文檔 - 當然,我們必須從辦公室開始。 文檔,但誰會閱讀說明?最佳實踐 - 好吧,至少閱讀創作者的建議。氣流用戶界面 - 最開始:圖片中的用戶界面了解 Apache Airflow 的關鍵概念 - 基本概念描述得很好,如果(突然!)你不明白我的意思。天龍博客——Airflow服務器/集群搭建指南 - 設置 Airflow 集群的簡短指南。在 Lyft 運行 Apache Airflow - 幾乎相同的有趣文章,除了形式主義更多,示例更少。Apache Airflow 如何在 Celery worker 上分配作業 — 關於與 Celery 一起工作。Apache Airflow 中的 DAG 編寫最佳實踐 - 關於任務的冪等性、按 ID 而不是日期加載、轉換、文件結構和其他有趣的事情。在 Apache Airflow 中管理依賴關係 - 任務和触發規則的依賴關係,我只是順便提到了這一點。Airflow:當你的 DAG 遠遠落後於計劃時 - 如何克服調度程序中的一些“按預期工作”、加載丟失的數據和確定任務的優先級。對 Apache Airflow 有用的 SQL 查詢 — 對 Airflow 元數據有用的 SQL 查詢。開始使用 Apache Airflow 開發工作流 - 有一個關於創建自定義傳感器的有用部分。使用 Presto 和 Airflow 在 AWS 上構建 Fetchr Data Science Infra — 關於在 AWS 上為數據科學構建基礎設施的有趣簡短說明。調試 Airflow DAG 時要檢查的 7 個常見錯誤 - 常見錯誤(當有人仍然沒有閱讀說明時)。使用 Apache Airflow 存儲和訪問密碼 - 微笑人們如何存儲密碼,儘管你可以只使用 Connections。Python 之禪與 Apache Airflow - 隱式 DAG 轉發、函數中的上下文投擲、依賴關係以及跳過任務啟動。Airflow:鮮為人知的提示、技巧和最佳實踐 - 關於使用default arguments
иparams
在模板中,以及變量和連接。分析 Airflow Scheduler - 一個關於規劃者如何為 Airflow 2.0 做準備的故事。Apache Airflow 與 3 個 Celery 工人在 docker-compose 中 - 一篇關於在中部署我們的集群的稍微過時的文章docker-compose
.4 使用 Airflow 上下文的模板任務 - 使用模板和上下文轉發的動態任務。Airflow 中的錯誤通知 — 通過郵件和 Slack 的標準和自定義通知。Airflow 研討會:沒有拐杖的複雜 DAG - 分支任務、宏和 XCom。
以及文章中使用的鏈接:
宏觀參考 - 可用於模板的佔位符。常見的陷阱——氣流 — 創建 dag 時的常見錯誤。puckel/docker-airflow:Docker Apache Airflow -docker-compose
用於實驗、調試等。python-telegram-bot/python-telegram-bot:我們為您製作了一個您無法拒絕的包裝器 — Telegram REST API 的 Python 包裝器。
來源: www.habr.com