Airflow是一個方便快速開發和維護大量資料處理流程的工具

Airflow是一個方便快速開發和維護大量資料處理流程的工具

你好,哈布爾! 在本文中,我想討論一個用於開發大量資料處理流程的出色工具,例如在企業 DWH 或 DataLake 的基礎架構中。 我們就來談談Apache Airflow(以下簡稱Airflow)。 Habré 被不公平地剝奪了關注,在主要部分中,我將嘗試說服您,在為您的 ETL/ELT 流程選擇調度程序時,至少 Airflow 值得關注。

在此之前,我在 Tinkoff 銀行工作時寫過一系列有關 DWH 主題的文章。 現在我已經成為 Mail.Ru Group 團隊的一員,正在開發一個遊戲領域的資料分析平台。 實際上,隨著新聞和有趣的解決方案的出現,我和我的團隊將在這裡討論我們的數據分析平台。

序幕

那麼,讓我們開始吧。 什麼是氣流? 這是圖書館(或 庫集) 開發、規劃和監控工作流程。 Airflow的主要特點:使用Python程式碼來描述(開發)流程。 這對於組織您的專案和開發有很多優勢:本質上,您的(例如)ETL 專案只是一個 Python 項目,您可以按照您的意願組織它,同時考慮到基礎設施的具體情況、團隊規模和其他需求。 從工具上講,一切都很簡單。 例如使用 PyCharm + Git。 真是太棒了,而且非常方便!

現在讓我們來看看Airflow的主要實體。 透過了解它們的本質和目的,您可以最佳地組織您的流程架構。 或許最主要的實體就是有向無環圖(以下簡稱DAG)。

DAG

DAG 是您想要根據特定時間表以嚴格定義的順序完成的任務的一些有意義的關聯。 Airflow 提供了一個方便的 Web 介面來與 DAG 和其他實體一起使用:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

DAG 可能如下所示:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

開發人員在設計 DAG 時,會制定一組運算符,在這些運算符上建構 DAG 內的任務。 這裡我們來到另一個重要的實體:Airflow Operator。

運營商

操作員是建立作業實例的基礎上的實體,它描述了作業實例執行期間將發生的情況。 Airflow 從 GitHub 發布 已經包含一組可供使用的運算子。 例子:

  • BashOperator - 用於執行 bash 指令的運算子。
  • PythonOperator - 用於呼叫 Python 程式碼的運算子。
  • EmailOperator — 發送電子郵件的運算子。
  • HTTPOperator - 用於處理 http 請求的運算子。
  • SqlOperator - 用於執行 SQL 程式碼的運算子。
  • 感測器是一個等待事件的操作符(所需時間的到來、所需文件的出現、資料庫中的一行、API 的回應等)。

還有更具體的運算元:DockerOperator、HiveOperator、S3FileTransferOperator、PrestoToMysqlOperator、SlackOperator。

您還可以根據自己的特點開發算子並在專案中使用。 例如,我們建立了 MongoDBToHiveViaHdfsTransfer,一個用於將文件從 MongoDB 匯出到 Hive 的運算符,以及幾個用於使用的運算符 點擊之家:CHLoadFromHiveOperator 和 CHTableLoaderOperator。 本質上,一旦專案中頻繁使用基於基本語句建立的程式碼,您就可以考慮將其建置到新語句中。 這將簡化進一步的開發,並且您將擴展專案中的運算符庫。

接下來,所有這些任務實例都需要執行,現在我們將討論調度程序。

調度器

Airflow 的任務排程器建構於 芹菜。 Celery 是一個 Python 函式庫,可讓您組織佇列以及任務的非同步和分散式執行。 在 Airflow 方面,所有任務都被劃分到池中。 池是手動建立的。 通常,它們的目的是限制使用來源的工作量或典型化 DWH 中的任務。 池可以透過 Web 介面進行管理:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

每個池都有插槽數量限制。 建立 DAG 時,會為其指定一個池:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

在 DAG 層級定義的池可以在任務層級覆蓋。
一個單獨的進程 Scheduler 負責調度 Airflow 中的所有任務。 實際上,調度程序處理設定要執行的任務的所有機制。 該任務在執行前會經歷幾個階段:

  1. DAG 中先前的任務已完成;新的任務可以排隊。
  2. 佇列根據任務的優先權進行排序(優先權也可以控制),如果池中有空閒槽,則可以將任務投入運作。
  3. 如果有空閒的worker celery,則將任務傳送給它; 您在問題中編寫的工作開始,使用一個或另一個運算符。

夠簡單的。

調度程序在所有 DAG 集合以及 DAG 內的所有任務上運行。

為了讓 Scheduler 開始與 DAG 一起工作,DAG 需要設定一個時間表:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

有一組現成的預設: @once, @hourly, @daily, @weekly, @monthly, @yearly.

您也可以使用 cron 表達式:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

執行日期

要了解 Airflow 的工作原理,了解 DAG 的執行日期非常重要。 在 Airflow 中,DAG 有一個執行日期維度,即根據 DAG 的工作計劃,為每個執行日期建立任務實例。 對於每個執行日期,可以重新執行任務 - 或者,例如,DAG 可以在多個執行日期同時工作。 這裡清楚地顯示了這一點:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

不幸的是(或者幸運的是:這取決於具體情況),如果 DAG 中任務的實施得到糾正,則前一個執行日期的執行將考慮調整後繼續進行。 如果你需要使用新的演算法重新計算過去一段時間的數據,這很好,但它很糟糕,因為結果的再現性丟失了(當然,沒有人打擾你從Git 返回所需版本的源代碼併計算什麼)您需要一次性,以您需要的方式)。

生成任務

DAG 的實作是 Python 程式碼,因此我們有一個非常方便的方法來減少工作時的程式碼量,例如使用分片來源。 假設您有三個 MySQL 分片作為來源,您需要進入每個分片並取得一些資料。 而且,獨立且並行。 DAG 中的 Python 程式碼可能如下所示:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG 看起來像這樣:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

在這種情況下,您只需調整設定並更新 DAG 即可新增或刪除分片。 舒服的!

您還可以使用更複雜的程式碼生成,例如,使用資料庫形式的來源或描述表結構、使用表的演算法,並考慮 DWH 基礎設施的功能,生成流程用於將 N 個表載入到您的儲存中。 或者,例如,使用不支援使用清單形式的參數的 API,您可以從該清單在 DAG 中產生 N 個任務,將 API 中請求的並行度限製到一個池,然後抓取來自 API 的必要資料。 靈活的!

存儲庫

Airflow有自己的後端儲存庫,一個資料庫(可以是MySQL或Postgres,我們有Postgres),它儲存任務的狀態,DAG,連接設置,全域變數等。在這裡我想我可以說Airflow 中的儲存庫非常簡單(大約20 個表),如果您想在其上建立自己的任何流程,也很方便。 我記得Informatica儲存庫中有100500個表,在了解如何建立查詢之前必須研究很長時間。

監控

鑑於儲存庫的簡單性,您可以建立一個對您來說方便的任務監控流程。 我們在 Zeppelin 中使用記事本,在其中查看任務的狀態:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

這也可能是 Airflow 本身的 Web 介面:

Airflow是一個方便快速開發和維護大量資料處理流程的工具

Airflow 程式碼是開源的,因此我們在 Telegram 中新增了警報。 如果發生錯誤,任務的每個執行實例都會向整個開發和支援團隊所在的 Telegram 群組發送垃圾郵件。

我們透過 Telegram 收到及時回覆(如果需要),並透過 Zeppelin 我們收到 Airflow 中任務的整體情況。

在總

Airflow 主要是開源的,你不應該指望它能帶來奇蹟。 準備好投入時間和精力來建立有效的解決方案。 目標是可以實現的,相信我,這是值得的。 開發速度、靈活性、添加新流程的便利性 - 您一定會喜歡的。 當然,您需要非常關注專案的組織、Airflow 本身的穩定性:奇蹟不會發生。

現在我們每天都有 Airflow 工作 約 6,5 個任務。 他們的性格截然不同。 有從許多不同且非常具體的來源將資料載入到主 DWH 的任務,有在主 DWH 內計算店面的任務,有將資料發佈到快速 DWH 的任務,有很多很多不同的任務 - 以及 Airflow日復一日日地咀嚼它們。 用數字來說,這是 2,3萬 DWH (Hadoop) 中不同複雜度的 ELT 任務,約 2,5 個資料庫 消息來源,這是一支來自 4名ETL開發人員,分為DWH中的ETL資料處理和DWH中的ELT資料處理,當然還有更多 一名管理員,誰負責服務的基礎設施。

Планынабудущее

進程的數量不可避免地在增長,而我們在 Airflow 基礎設施方面要做的主要事情是擴展。 我們想要建立一個 Airflow 集群,為 Celery 工作人員分配一對腿,並製作一個具有作業調度流程和儲存庫的自我複製頭。

尾聲

當然,這並不是我想講述的有關 Airflow 的全部內容,但我試圖強調要點。 食慾是隨著吃來的,試試看一下,你會喜歡的:)

來源: www.habr.com

添加評論