Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Moien, Habr! An dësem Artikel wëll ech iwwer ee super Tool schwätzen fir Batchdatenveraarbechtungsprozesser z'entwéckelen, zum Beispill an der Infrastruktur vun enger Firmen-DWH oder Ärem DataLake. Mir schwätzen iwwer Apache Airflow (nodréiglech genannt Airflow). Et ass ongerecht vun der Opmierksamkeet op Habré entzunn, an am Haaptdeel wäert ech probéieren Iech ze iwwerzeegen datt op d'mannst Airflow et wäert ass ze kucken wann Dir e Scheduler fir Är ETL / ELT Prozesser auswielt.

Virdrun hunn ech eng Serie vun Artikelen iwwert d'Thema vun DWH geschriwwen wann ech bei Tinkoff Bank geschafft. Elo sinn ech en Deel vum Mail.Ru Group Team ginn an entwéckelen eng Plattform fir Datenanalyse am Spillberäich. Eigentlech, wéi Neiegkeeten an interessant Léisunge erschéngen, wäerte mäi Team an ech hei iwwer eis Plattform fir Datenanalyse schwätzen.

Prolog

Also, loosst eis ufänken. Wat ass Airflow? Dëst ass eng Bibliothéik (oder Set vu Bibliothéiken) Aarbechtsprozesser z'entwéckelen, ze plangen an ze iwwerwaachen. D'Haaptfeature vum Airflow: Python Code gëtt benotzt fir Prozesser ze beschreiwen (entwéckelen). Dëst huet vill Virdeeler fir Äre Projet an Entwécklung ze organiséieren: am Wesentlechen ass Ären (zum Beispill) ETL Projet just e Python Projet, an Dir kënnt et organiséieren wéi Dir wëllt, andeems Dir d'Spezifizitéite vun der Infrastruktur, der Teamgréisst an aner Ufuerderunge. Instrumental ass alles einfach. Benotzt zum Beispill PyCharm + Git. Et ass wonnerbar a ganz bequem!

Loosst eis elo d'Haaptentitéite vum Airflow kucken. Andeems Dir hir Essenz an Zweck versteet, kënnt Dir Är Prozessarchitektur optimal organiséieren. Vläicht ass d'Haaptentitéit de Directed Acyclic Graph (nachfolgend DAG bezeechent).

TAG

En DAG ass eng sënnvoll Associatioun vun Ären Aufgaben, déi Dir wëllt an enger strikt definéierter Sequenz no engem spezifesche Zäitplang ausfëllen. Airflow bitt eng praktesch Webinterface fir mat DAGs an aner Entitéiten ze schaffen:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Den DAG kéint esou ausgesinn:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Den Entwéckler, wann en DAG designt, leet eng Rei vun Bedreiwer fest, op deenen Aufgaben am DAG gebaut ginn. Hei komme mer zu enger anerer wichteger Entitéit: Airflow Operator.

Betreiber

En Bedreiwer ass eng Entitéit op Basis vun deenen Jobinstanzen erstallt ginn, déi beschreift wat wärend der Ausféierung vun enger Jobinstanz geschitt. Airflow verëffentlecht vu GitHub enthalen schonn eng Rei vun Opérateuren prett ze benotzen. Beispiller:

  • BashOperator - Bedreiwer fir e Bash Kommando auszeféieren.
  • PythonOperator - Bedreiwer fir Python Code ze ruffen.
  • EmailOperator - Bedreiwer fir E-Mail ze schécken.
  • HTTPOperator - Bedreiwer fir mat http Ufroen ze schaffen.
  • SqlOperator - Bedreiwer fir SQL Code auszeféieren.
  • Sensor ass en Bedreiwer fir op en Event ze waarden (d'Arrivée vun der erfuerderter Zäit, d'Erscheinung vun der erfuerderter Datei, eng Zeil an der Datebank, eng Äntwert vun der API, etc., etc.).

Et gi méi spezifesch Betreiber: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Dir kënnt och Betreiber entwéckelen op Basis vun Ären eegene Charakteristiken a benotzen se an Ärem Projet. Zum Beispill hu mir MongoDBToHiveViaHdfsTransfer erstallt, e Bedreiwer fir Dokumenter vu MongoDB op Hive ze exportéieren, a verschidde Bedreiwer fir mat ze schaffen clickhouse: CHLoadFromHiveOperator an CHTableLoaderOperator. Wesentlech, soubal e Projet dacks Code benotzt huet op Basis Aussoen gebaut, kënnt Dir drun denken et an eng nei Ausso ze bauen. Dëst wäert weider Entwécklung vereinfachen, an Dir wäert Är Bibliothéik vun Opérateuren am Projet expandéieren.

Als nächst musse all dës Instanzen vun Aufgaben ausgefouert ginn, an elo wäerte mir iwwer de Scheduler schwätzen.

Scheduler

Dem Airflow säin Task Scheduler ass op gebaut Zelleriszalot. Sellerie ass eng Python Bibliothéik déi Iech erlaabt eng Schlaang plus asynchron a verdeelt Ausféierung vun Aufgaben ze organiséieren. Op der Airflow Säit sinn all Aufgaben a Poole opgedeelt. Poole ginn manuell erstallt. Typesch ass hiren Zweck d'Aarbechtsbelaaschtung vun der Aarbecht mat der Quell ze limitéieren oder Aufgaben an der DWH ze typéieren. Poole kënnen iwwer de Webinterface geréiert ginn:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

All Pool huet eng Limite op d'Zuel vun Plaze . Wann Dir en DAG erstellt, kritt et e Pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

E Pool definéiert um DAG Niveau kann um Taskniveau iwwerschratt ginn.
En separaten Prozess, Scheduler, ass verantwortlech fir all Aufgaben am Airflow ze plangen. Eigentlech beschäftegt de Scheduler all d'Mechanik fir Aufgaben fir d'Ausféierung ze setzen. D'Aufgab geet duerch e puer Etappen ier se ausgefouert gëtt:

  1. Déi viregt Aufgaben sinn am DAG ofgeschloss ginn, eng nei kann an der Schlaang stoen.
  2. D'Schlaang ass jee no der Prioritéit vun Aufgaben zortéiert (Prioritéiten kënnen och kontrolléiert ginn), a wann et e gratis Slot am Pool ass, kann d'Aufgab a Betrib geholl ginn.
  3. Wann et e gratis Aarbechter Sellerie ass, gëtt d'Aufgab un et geschéckt; d'Aarbecht, déi Dir am Problem programméiert, fänkt un, mat engem oder aneren Bedreiwer.

Einfach genuch.

Scheduler leeft op de Set vun all DAGs an all Aufgaben bannent DAGs.

Fir de Scheduler mat DAG ze schaffen, muss den DAG e Zäitplang setzen:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Et gëtt e Set vu fäerdege Presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Dir kënnt och Cron Ausdréck benotzen:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ausféierung Datum

Fir ze verstoen wéi Airflow funktionnéiert, ass et wichteg ze verstoen wat Ausféierungsdatum fir en DAG ass. Am Airflow huet DAG eng Ausféierungsdatum Dimensioun, dat heescht, ofhängeg vum DAG säin Aarbechtsplang, ginn Task Instanzen fir all Ausféierungsdatum erstallt. A fir all Ausféierungsdatum kënnen Aufgaben nei ausgefouert ginn - oder, zum Beispill, en DAG kann gläichzäiteg a verschiddenen Ausféierungsdatum schaffen. Dëst ass kloer hei gewisen:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Leider (oder vläicht glécklecherweis: et hänkt vun der Situatioun of), wann d'Ëmsetzung vun der Aufgab am DAG korrigéiert ass, da geet d'Ausféierung am viregten Ausféierungsdatum weider, andeems d'Upassunge berücksichtegt ginn. Dëst ass gutt wann Dir Daten an de vergaangene Perioden mat engem neien Algorithmus nei berechnen musst, awer et ass schlecht well d'Reproduzibilitéit vum Resultat verluer ass (natierlech stéiert keen Iech fir déi erfuerderlech Versioun vum Quellcode vu Git zréckzeginn a berechent wat Dir braucht eng Kéier, wéi Dir et braucht).

Aufgaben ze generéieren

D'Ëmsetzung vun der DAG ass Code am Python, also hu mir e ganz praktesche Wee fir d'Quantitéit vum Code ze reduzéieren wann Dir schafft, zum Beispill mat sharded Quellen. Loosst eis soen datt Dir dräi MySQL-Shards als Quell hutt, Dir musst an all eenzel klammen an e puer Daten ophuelen. Ausserdeem, onofhängeg a parallel. De Python Code am DAG kéint esou ausgesinn:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Den DAG gesäit esou aus:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

An dësem Fall kënnt Dir e Shard addéieren oder ewechhuelen andeems Dir einfach d'Astellungen upasst an den DAG aktualiséieren. Gemittlech!

Dir kënnt och méi komplex Code Generatioun benotzen, zum Beispill, mat Quellen an der Form vun enger Datebank ze schaffen oder eng Tabellstruktur beschreiwen, en Algorithmus fir mat enger Tabell ze schaffen, an, andeems d'Features vun der DWH Infrastruktur berücksichtegt ginn, e Prozess generéieren fir eng Luede N Dëscher an Ärer Stockage. Oder, zum Beispill, mat enger API ze schaffen, déi net mat engem Parameter an der Form vun enger Lëscht schaffen, kënnt Dir N Aufgaben an engem DAG aus dëser Lëscht generéieren, d'Parallelismus vun Ufroen an der API op e Pool limitéieren an d'Schrauwen néideg Daten aus der API. Flexibel!

repository

Airflow huet säin eegene Backend Repository, eng Datebank (kann MySQL oder Postgres sinn, mir hunn Postgres), déi d'Staaten vun Aufgaben, DAGs, Verbindungsastellungen, global Variablen, asw., etc. Hei wéilt ech soen datt de Repository am Airflow ass ganz einfach (ongeféier 20 Dëscher) a praktesch wann Dir iergendeng vun Ären eegene Prozesser uewen opbaue wëllt. Ech erënnere mech un déi 100500 Dëscher am Informatica-Repository, déi fir eng laang Zäit studéiert musse ginn ier se verstanen hunn, wéi een eng Ufro bauen.

Iwwerwaachung

Mat der Einfachheet vum Repository kënnt Dir en Task-Iwwerwaachungsprozess bauen dee fir Iech bequem ass. Mir benotzen en Notizblock am Zeppelin, wou mir de Status vun den Aufgaben kucken:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Dëst kéint och d'Webinterface vum Airflow selwer sinn:

Airflow ass en Tool fir bequem a séier Batchdatenveraarbechtungsprozesser z'entwéckelen an z'erhalen

Den Airflow Code ass Open Source, also hu mir Alarm op Telegram bäigefüügt. All Lafen Instanz vun enger Aufgab, wann e Feeler geschitt, spams de Grupp am Telegram, wou d'ganz Entwécklung an Ënnerstëtzung Equipe besteet.

Mir kréien eng prompt Äntwert duerch Telegram (wann néideg), an duerch Zeppelin kréien mir e Gesamtbild vun Aufgaben am Airflow.

Total

Airflow ass haaptsächlech Open Source, an Dir sollt keng Wonner dovun erwaarden. Bereet Iech Zäit an Effort ze setzen fir eng Léisung ze bauen déi funktionnéiert. D'Zil ass erreechbar, gleeft mir, et ass derwäert. Geschwindegkeet vun der Entwécklung, Flexibilitéit, Liichtegkeet fir nei Prozesser ze addéieren - Dir wäert et gär hunn. Natierlech musst Dir vill Opmierksamkeet op d'Organisatioun vum Projet bezuelen, d'Stabilitéit vum Airflow selwer: Wonner geschéien net.

Elo hu mir Airflow all Dag schaffen iwwer 6,5 dausend Aufgaben. Si sinn ganz ënnerschiddlech am Charakter. Et ginn Aufgaben fir Daten an den Haapt-DWH aus ville verschiddene a ganz spezifesche Quellen ze lueden, et ginn Aufgabe fir Storefronts am Haapt-DWH ze berechnen, et ginn Aufgaben fir Daten an e séieren DWH ze publizéieren, et gi vill, vill verschidden Aufgaben - an Airflow kauen se all Dag fir Dag. An Zuelen schwätzen, ass dëst 2,3 Dausend ELT Aufgaben vu variéierter Komplexitéit bannent DWH (Hadoop), ca. 2,5 honnert Datenbanken Quellen, dëst ass eng Equipe aus 4 ETL Entwéckler, déi ënnerdeelt sinn an ETL Datenveraarbechtung an DWH an ELT Datenveraarbechtung bannent DWH an natierlech méi een Admin, déi sech mat der Infrastruktur vum Service beschäftegt.

Pläng fir d'Zukunft

D'Zuel vun de Prozesser wiisst zwangsleefeg, an d'Haapt Saach mir wäerte maachen wat d'Airflow Infrastruktur ugeet ass Skaléieren. Mir wëllen en Airflow-Cluster bauen, e Paar Been fir Sellerieaarbechter verdeelen, an e selbstduplizéierte Kapp mat Aarbechtsplangprozesser an engem Repository maachen.

Epilogue

Dëst ass natierlech net alles wat ech iwwer Airflow erziele géif, awer ech hu probéiert d'Haaptpunkte ze markéieren. Appetit kënnt mam Iessen, probéiert et an Dir wäert et gär hunn :)

Source: will.com

Setzt e Commentaire