Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Čau Habr! V tomto článku chci mluvit o jednom skvělém nástroji pro vývoj procesů dávkového zpracování dat, například v infrastruktuře firemního DWH nebo vašeho DataLake. Budeme mluvit o Apache Airflow (dále jen Airflow). Ten je na Habré nespravedlivě ochuzen o pozornost a v hlavní části se vás pokusím přesvědčit, že alespoň Airflow stojí za to při výběru plánovače pro vaše ETL / ELT procesy.

Předtím jsem napsal sérii článků na téma DWH, když jsem pracoval v Tinkoff Bank. Nyní jsem se stal součástí týmu Mail.Ru Group a vyvíjím platformu pro analýzu dat v oblasti her. Ve skutečnosti, když se objeví novinky a zajímavá řešení, budeme zde s týmem hovořit o naší platformě pro analýzu dat.

Prolog

Takže, začněme. Co je proudění vzduchu? Jedná se o knihovnu (resp soubor knihoven) vyvíjet, plánovat a monitorovat pracovní postupy. Hlavním rysem Airflow je, že kód Python se používá k popisu (vývoji) procesů. To má mnoho výhod pro organizaci vašeho projektu a vývoj: ve skutečnosti je váš (například) ETL projekt pouze projektem Pythonu a můžete si ho uspořádat, jak chcete, s ohledem na funkce infrastruktury, velikost týmu a další požadavky. . Instrumentálně je vše jednoduché. Použijte například PyCharm + Git. Je to skvělé a velmi pohodlné!

Nyní se podívejme na hlavní entity Airflow. Po pochopení jejich podstaty a účelu budete optimálně organizovat architekturu procesu. Snad hlavní entitou je řízený acyklický graf (dále jen DAG).

DAG

DAG je nějaké sémantické spojení vašich úkolů, které chcete dokončit v přesně definovaném pořadí podle určitého plánu. Airflow představuje pohodlné webové rozhraní pro práci s DAG a dalšími entitami:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

DAG může vypadat takto:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Při navrhování DAG vývojář stanoví sadu operátorů, na kterých budou postaveny úkoly v rámci DAG. Zde se dostáváme k další důležité entitě: Airflow Operator.

Operátoři

Operátor je entita, na jejímž základě se vytvářejí instance úlohy, která popisuje, co se bude dít během provádění instance úlohy. Vydání Airflow z GitHubu již obsahují sadu příkazů připravených k použití. Příklady:

  • BashOperator je operátor pro provádění příkazu bash.
  • PythonOperator je operátor pro volání kódu Python.
  • EmailOperator – operátor pro odesílání emailů.
  • HTTPOperator - operátor pro práci s http požadavky.
  • SqlOperator je operátor pro provádění kódu SQL.
  • Senzor je operátor pro čekání na událost (příchod požadovaného času, vzhled požadovaného souboru, řádek v databázi, odpověď z API atd. atd.).

Existují specifičtější operátoři: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Můžete také vyvinout operátory podle svých potřeb a použít je ve svém projektu. Vytvořili jsme například MongoDBToHiveViaHdfsTransfer, operátor pro export dokumentů z MongoDB do Hive, a několik operátorů pro práci s clickhouse: CHLoadFromHiveOperator a CHTableLoaderOperator. Ve skutečnosti, jakmile projekt často používá kód postavený na základních příkazech, můžete přemýšlet o jeho zkompilování do nového příkazu. To zjednoduší další vývoj a přidáte si svou knihovnu operátorů v projektu.

Dále je třeba provést všechny tyto instance úloh a nyní budeme hovořit o plánovači.

Планировщик

Plánovač úloh v Airflow je postaven na Celer. Celery je knihovna Pythonu, která vám umožňuje organizovat frontu plus asynchronní a distribuované provádění úloh. Ze strany Airflow jsou všechny úkoly rozděleny do bazénů. Pool se vytváří ručně. Jejich účelem je zpravidla omezit zátěž při práci se zdrojem nebo zadávat úlohy uvnitř DWH. Bazény lze spravovat přes webové rozhraní:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Každý fond má omezený počet slotů. Při vytváření DAG je mu přidělen fond:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Fond nastavený na úrovni DAG lze přepsat na úrovni úlohy.
Za plánování všech úloh v Airflow je zodpovědný samostatný proces, Scheduler. Plánovač se ve skutečnosti zabývá všemi mechanismy nastavování úkolů pro provádění. Úloha prochází několika fázemi, než je provedena:

  1. Předchozí úkoly byly v DAG dokončeny, nový lze zařadit do fronty.
  2. Fronta je řazena v závislosti na prioritě úkolů (priority lze také řídit), a pokud je ve fondu volný slot, lze úkol spustit.
  3. Pokud existuje volný dělník celer, úkol je odeslán na něj; začíná práce, kterou jste naprogramovali v úloze, pomocí jednoho nebo druhého operátoru.

Prostě jednoduché.

Plánovač běží na sadě všech DAG a všech úloh v rámci DAG.

Aby plánovač mohl začít pracovat s DAG, musí DAG nastavit plán:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

K dispozici je sada hotových předvoleb: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Můžete také použít cron výrazy:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Datum provedení

Abyste pochopili, jak Airflow funguje, je důležité pochopit, co je datum provedení pro DAG. Airflow DAG má dimenzi Execution Date, tj. v závislosti na pracovním plánu DAG jsou instance úkolů vytvářeny pro každé datum provedení. A pro každé Datum provedení lze úkoly znovu provést - nebo například DAG může pracovat současně v několika datech provedení. To je jasně ukázáno zde:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Bohužel (nebo možná naštěstí: záleží na situaci), pokud je implementace úkolu v DAG správná, pak provedení v předchozím Date Execution půjde s úpravami. To je dobré, pokud potřebujete přepočítat data v minulých obdobích pomocí nového algoritmu, ale je to špatné, protože se ztrácí reprodukovatelnost výsledku (nikdo se samozřejmě neobtěžuje vrátit požadovanou verzi zdrojového kódu z Gitu a vypočítat, co potřeba jednou, podle potřeby).

Generování úkolů

Implementace DAG je kód Python, takže máme velmi pohodlný způsob, jak snížit množství kódu při práci například se shardovanými zdroji. Předpokládejme, že máte jako zdroj tři úlomky MySQL, musíte do každého vlézt a sebrat nějaká data. A to nezávisle a paralelně. Kód Pythonu v DAG může vypadat takto:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG vypadá takto:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Zároveň můžete přidat nebo odebrat úlomek jednoduchou úpravou nastavení a aktualizací DAG. Komfortní!

Můžete také použít složitější generování kódu, například pracovat se zdroji ve formě databáze nebo popsat tabulkovou strukturu, algoritmus pro práci s tabulkou a s přihlédnutím k vlastnostem infrastruktury DWH vygenerovat proces načtení N tabulek do vašeho úložiště. Nebo například při práci s API, které nepodporuje práci s parametrem ve formě seznamu, můžete pomocí tohoto seznamu vygenerovat N úloh v DAG, omezit paralelismus požadavků v API na fond a extrahovat potřebná data z API. Flexibilní!

úložiště

Airflow má vlastní backendové úložiště, databázi (možná MySQL nebo Postgres, my máme Postgres), kde jsou uloženy stavy úkolů, DAGy, nastavení připojení, globální proměnné atd. atd. Zde bych chtěl říci, že úložiště in Airflow je velmi jednoduchý (asi 20 tabulek) a pohodlný, pokud na něm chcete postavit některý ze svých procesů. Pamatuji si 100500 XNUMX tabulek v úložišti Informatica, které se musely dlouho kouřit, než jsem pochopil, jak sestavit dotaz.

Sledování

Vzhledem k jednoduchosti úložiště můžete vytvořit proces monitorování úloh, který je pro vás vhodný. V Zeppelinu používáme poznámkový blok, kde se podíváme na stav úkolů:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Může to být také webové rozhraní samotného Airflow:

Airflow je nástroj pro pohodlný a rychlý vývoj a údržbu procesů dávkového zpracování dat

Kód proudění vzduchu je otevřený, proto jsme přidali upozornění v telegramu. Každá instance spuštěné úlohy, pokud dojde k chybě, posílá spam do skupiny Telegram, kde se skládá celý vývojový a podpůrný tým.

Dostaneme okamžitou odpověď prostřednictvím telegramu (pokud je to požadováno), prostřednictvím Zeppelinu - celkový obrázek o úkolech v Airflow.

Celkem

Airflow je především open source a nečekejte od něj zázraky. Buďte připraveni vynaložit čas a úsilí na vytvoření funkčního řešení. Cíl z kategorie dosažitelných, věřte, stojí to za to. Rychlost vývoje, flexibilita, snadné přidávání nových procesů – to si zamilujete. Samozřejmě je třeba věnovat velkou pozornost organizaci projektu, stabilitě práce samotného Airflow: neexistují žádné zázraky.

Nyní máme Airflow fungující denně asi 6,5 tisíce úkolů. Povahově se dost liší. Existují úkoly pro načítání dat do hlavního DWH z mnoha různých a velmi specifických zdrojů, existují úkoly pro výpočet výkladů uvnitř hlavního DWH, existují úkoly pro publikování dat do rychlého DWH, existuje mnoho, mnoho různých úkolů – a Airflow žvýká je celý den za dnem. Řečeno v číslech, je to tak 2,3 tisíc Úlohy ELT různé složitosti uvnitř DWH (Hadoop), o 2,5 stovky databází zdrojů, toto je příkaz z 4 ETL vývojáři, které se dělí na ETL zpracování dat v DWH a ELT zpracování dat v rámci DWH a samozřejmě další jeden admin, která se zabývá infrastrukturou služby.

Plány do budoucna

Počet procesů nevyhnutelně roste a to hlavní, co budeme dělat z hlediska infrastruktury Airflow, je škálování. Chceme vybudovat Airflow cluster, alokovat pár nohou pro pracovníky Celery a vytvořit duplicitní hlavu s procesy plánování úloh a úložištěm.

Epilog

To samozřejmě není zdaleka vše, co bych chtěl o Airflow mluvit, ale snažil jsem se zdůraznit hlavní body. S jídlem přichází chuť, vyzkoušejte a bude vám chutnat 🙂

Zdroj: www.habr.com

Přidat komentář