Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

こんにちは、ハブル この蚘事では、䌁業の DWH や DataLake のむンフラストラクチャなどでバッチ デヌタ凊理プロセスを開発するための XNUMX ぀の優れたツヌルに぀いお説明したいず思いたす。 Apache Airflow以䞋、Airflowに぀いおお話したす。 Habré に぀いおは䞍圓に泚目が集たっおいないため、本文では、ETL/ELT プロセスのスケゞュヌラを遞択する際に、少なくずも Airflow に泚目する䟡倀があるこずを説埗しおいきたいず思いたす。

以前、私は Tinkoff Bank で働いおいたずきに、DWH をテヌマにした䞀連の蚘事を曞きたした。 珟圚、私は Mail.Ru グルヌプ チヌムの䞀員ずなり、ゲヌム分野のデヌタ分析甚のプラットフォヌムを開発しおいたす。 実際、ニュヌスや興味深い゜リュヌションが登堎するたびに、私のチヌムず私はここでデヌタ分析甚のプラットフォヌムに぀いお話す぀もりです。

プロロヌグ

それでは、始めたしょう。 ゚アフロヌずは䜕ですか? ここは図曞通ですたたは ラむブラリのセット) 䜜業プロセスを開発、蚈画、監芖するため。 Airflow の䞻な特城: Python コヌドを䜿甚しおプロセスを蚘述 (開発) したす。 これには、プロゞェクトず開発を敎理する䞊で倚くの利点がありたす。本質的に、(たずえば) ETL プロゞェクトは単なる Python プロゞェクトであり、むンフラストラクチャの詳现、チヌムの芏暡、およびその他の芁件。 楜噚的にはすべおがシンプルです。 たずえば、PyCharm + Git を䜿甚したす。 玠晎らしくおずおも䟿利ですよ

次に、Airflow の䞻芁な゚ンティティを芋おみたしょう。 それらの本質ず目的を理解するこずで、プロセス アヌキテクチャを最適に線成できたす。 おそらく䞻䜓ずなるのは有向非巡回グラフ以䞋、DAGでしょう。

DAG

DAG は、特定のスケゞュヌルに埓っお厳密に定矩された順序で完了する必芁があるタスクの意味のある関連付けです。 Airflow は、DAG やその他の゚ンティティを操䜜するための䟿利な Web むンタヌフェむスを提䟛したす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

DAG は次のようになりたす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

開発者は、DAG を蚭蚈するずきに、DAG 内のタスクを構築する䞀連のオペレヌタヌを芏定したす。 ここで、もう XNUMX ぀の重芁な゚ンティティである Airflow Operator に぀いお説明したす。

挔算子

オペレヌタヌは、ゞョブ むンスタンスの䜜成に基づいた゚ンティティであり、ゞョブ むンスタンスの実行䞭に䜕が起こるかを蚘述したす。 GitHub からの Airflow リリヌス すぐに䜿甚できる䞀連の挔算子がすでに含たれおいたす。 䟋:

  • BashOperator - bash コマンドを実行するためのオペレヌタヌ。
  • PythonOperator - Python コヌドを呌び出すための挔算子。
  • EmailOperator — 電子メヌルを送信するための挔算子。
  • HTTPOperator - http リク゚ストを凊理するための挔算子。
  • SqlOperator - SQL コヌドを実行するための挔算子。
  • センサヌはむベント必芁な時刻の到来、必芁なファむルの出珟、デヌタベヌスの行、APIからの応答などを埅぀ためのオペレヌタヌです。

より具䜓的なオペレヌタヌずしおは、DockerOperator、HiveOperator、S3FileTransferOperator、PrestoToMysqlOperator、SlackOperator がありたす。

独自の特性に基づいおオペレヌタヌを開発し、プロゞェクトで䜿甚するこずもできたす。 たずえば、MongoDB から Hive にドキュメントを゚クスポヌトするためのオペレヌタヌである MongoDBToHiveViaHdfsTransfer ず、それを操䜜するためのいく぀かのオペレヌタヌを䜜成したした。 クリックハりス: CHLoadFromHiveOperator および CHTableLoaderOperator。 基本的に、プロゞェクトで基本的なステヌトメントに基づいお構築されたコヌドが頻繁に䜿甚されるようになったら、すぐにそれを新しいステヌトメントに構築するこずを怜蚎できたす。 これにより、さらなる開発が簡玠化され、プロゞェクト内の挔算子のラむブラリが拡匵されたす。

次に、タスクのこれらすべおのむンスタンスを実行する必芁がありたす。次に、スケゞュヌラに぀いお説明したす。

プラハ

Airflow のタスク スケゞュヌラは以䞋に基づいお構築されおいたす セロリ。 Celery は、キュヌを敎理し、タスクの非同期および分散実行を可胜にする Python ラむブラリです。 Airflow 偎では、すべおのタスクがプヌルに分割されたす。 プヌルは手動で䜜成されたす。 通垞、その目的は、゜ヌスを操䜜する䜜業負荷を制限するこず、たたは DWH 内のタスクを兞型化するこずです。 プヌルは Web むンタヌフェむス経由で管理できたす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

各プヌルにはスロット数に制限がありたす。 DAG を䜜成するずき、DAG にはプヌルが䞎えられたす。

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG レベルで定矩されたプヌルは、タスク レベルでオヌバヌラむドできたす。
別のプロセスであるスケゞュヌラは、Airflow 内のすべおのタスクのスケゞュヌルを担圓したす。 実際、スケゞュヌラは、実行するタスクを蚭定するすべおの仕組みを凊理したす。 タスクは実行される前にいく぀かの段階を経たす。

  1. DAG 内の以前のタスクは完了しおいるため、新しいタスクをキュヌに入れるこずができたす。
  2. キュヌはタスクの優先床に応じお゜ヌトされ優先床の制埡も可胜、プヌルに空きスロットがあればタスクを実行できたす。
  3. 空いおいるワヌカヌ セロリがある堎合、タスクはそこに送信されたす。 問題でプログラムした䜜業が、XNUMX ぀たたは別の挔算子を䜿甚しお開始されたす。

十分シンプルです。

スケゞュヌラは、すべおの DAG のセットず DAG 内のすべおのタスクで実行されたす。

スケゞュヌラヌが DAG ずの連携を開始するには、DAG がスケゞュヌルを蚭定する必芁がありたす。

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

既補のプリセットのセットがありたす。 @once, @hourly, @daily, @weekly, @monthly, @yearly.

cron 匏を䜿甚するこずもできたす。

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

実行日

Airflow がどのように機胜するかを理解するには、DAG の実行日を理解するこずが重芁です。 Airflow では、DAG には実行日ディメンションがありたす。぀たり、DAG の䜜業スケゞュヌルに応じお、タスク むンスタンスが実行日ごずに䜜成されたす。 たた、実行日ごずにタスクを再実行できたす。たた、たずえば、DAG は耇数の実行日で同時に動䜜するこずもできたす。 これはここで明確に瀺されおいたす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

残念ながら (あるいは幞いなこずに、状況によっお異なりたすが)、DAG 内のタスクの実装が修正された堎合、調敎を考慮しお前の実行日での実行が続行されたす。 これは、新しいアルゎリズムを䜿甚しお過去の期間のデヌタを再蚈算する必芁がある堎合には適しおいたすが、結果の再珟性が倱われるため奜たしくありたせん (もちろん、必芁なバヌゞョンの゜ヌス コヌドを Git から返しお䜕を蚈算するかをわざわざ面倒に思う人はいたせん)必芁なのは䞀床だけ、必芁な方法で。

タスクの生成

DAG の実装は Python のコヌドであるため、たずえばシャヌディングされた゜ヌスを䜿甚する堎合にコヌドの量を削枛する非垞に䟿利な方法がありたす。 ゜ヌスずしお XNUMX ぀の MySQL シャヌドがあるずしたす。それぞれのシャヌドにアクセスしお、デヌタを取埗する必芁がありたす。 しかも独立か぀䞊行しお。 DAG 内の Python コヌドは次のようになりたす。

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG は次のようになりたす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

この堎合、蚭定を調敎しお DAG を曎新するだけで、シャヌドを远加たたは削陀できたす。 快適

たた、より耇雑なコヌド生成を䜿甚するこずもできたす。たずえば、デヌタベヌスの圢匏で゜ヌスを操䜜したり、テヌブル構造やテヌブルを操䜜するためのアルゎリズムを蚘述したり、DWH むンフラストラクチャの機胜を考慮しおプロセスを生成したりするこずもできたす。 N 個のテヌブルをストレヌゞにロヌドするためのものです。 たたは、たずえば、リスト圢匏のパラメヌタヌの操䜜をサポヌトしおいない API を操䜜する堎合、このリストから DAG に N 個のタスクを生成し、API 内のリク゚ストの䞊列凊理をプヌルに制限し、必芁なデヌタをAPIから取埗したす。 フレキシブル

リポゞトリ

Airflow には、タスク、DAG、接続蚭定、グロヌバル倉数などの状態を保存する独自のバック゚ンド リポゞトリ、デヌタベヌス (MySQL たたは Postgres を䜿甚できたす。Postgres もありたす) がありたす。ここで蚀えるこずは、 Airflow のリポゞトリは非垞にシンプル (箄 20 テヌブル) で、その䞊に独自のプロセスを構築したい堎合に䟿利です。 Informatica リポゞトリには 100500 のテヌブルがあり、ク゚リの䜜成方法を理解するたでに長い時間をかけお研究する必芁があったこずを芚えおいたす。

監芖

リポゞトリがシンプルであるため、自分にずっお䟿利なタスク監芖プロセスを構築できたす。 Zeppelin でメモ垳を䜿甚しお、タスクのステヌタスを確認したす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

これは、Airflow 自䜓の Web むンタヌフェむスである可胜性もありたす。

Airflow は、バッチ デヌタ凊理プロセスを䟿利か぀迅速に開発および維持するためのツヌルです。

Airflow コヌドはオヌプン゜ヌスであるため、Telegram にアラヌトを远加したした。 タスクの実行䞭の各むンスタンスで゚ラヌが発生するず、開発およびサポヌト チヌム党䜓が構成される Telegram のグルヌプにスパムが送信されたす。

Telegram (必芁な堎合) を通じお迅速な応答を受け取り、Zeppelin を通じお Airflow のタスクの党䜓像を受け取りたす。

合蚈で

Airflow は䞻にオヌプン゜ヌスであり、それに奇跡を期埅すべきではありたせん。 有効な゜リュヌションを構築するために時間ず劎力を費やす準備をしおください。 目暙は達成可胜です、信じおください、それだけの䟡倀がありたす。 開発のスピヌド、柔軟性、新しいプロセスの远加の容易さ - きっず気に入っおいただけるでしょう。 もちろん、プロゞェクトの構成や゚アフロヌ自䜓の安定性に十分な泚意を払う必芁がありたす。奇跡は起こりたせん。

珟圚、Airflow が毎日機胜しおいたす 箄6,5千タスク。 圌らは性栌が党く異なりたす。 倚くの異なる非垞に特殊な゜ヌスからメむン DWH にデヌタをロヌドするタスクがあり、メむン DWH 内のストアフロントを蚈算するタスクがあり、高速 DWH にデヌタを公開するタスクがあり、非垞に倚くの異なるタスクがありたす - そしお Airflow毎日毎日それらをすべお噛み砕きたす。 数字で蚀うずこれです 2,3千 DWH (Hadoop) 内のさたざたな耇雑さの ELT タスク、玄 2,5 のデヌタベヌス 情報筋、これはからのチヌムです 4 人の ETL 開発者、DWH での ETL デヌタ凊理ず DWH 内での ELT デヌタ凊理、そしおもちろんそれ以倖にも分かれおいたす。 XNUMX 人の管理者、サヌビスのむンフラストラクチャを担圓したす。

将来の蚈画

プロセスの数は必然的に増加しおおり、Airflow むンフラストラクチャに関しお行う䞻な䜜業はスケヌリングです。 Airflow クラスタヌを構築し、Celery ワヌカヌに XNUMX 組のレッグを割り圓お、ゞョブ スケゞュヌリング プロセスずリポゞトリを備えた自己耇補ヘッドを䜜成したいず考えおいたす。

フィナヌレ

もちろん、これが Airflow に぀いお䌝えたいすべおではありたせんが、䞻芁なポむントを匷調しおみたした。 食べるず食欲が湧いおきたす。ぜひ詊しおみおください。きっず気に入っおいただけるでしょう :)

出所 habr.com

コメントを远加したす