Čau Habr! V tomto článku chci mluvit o jednom skvělém nástroji pro vývoj procesů dávkového zpracování dat, například v infrastruktuře firemního DWH nebo vašeho DataLake. Budeme mluvit o Apache Airflow (dále jen Airflow). Ten je na Habré nespravedlivě ochuzen o pozornost a v hlavní části se vás pokusím přesvědčit, že alespoň Airflow stojí za to při výběru plánovače pro vaše ETL / ELT procesy.
Předtím jsem napsal sérii článků na téma DWH, když jsem pracoval v Tinkoff Bank. Nyní jsem se stal součástí týmu Mail.Ru Group a vyvíjím platformu pro analýzu dat v oblasti her. Ve skutečnosti, když se objeví novinky a zajímavá řešení, budeme zde s týmem hovořit o naší platformě pro analýzu dat.
Prolog
Takže, začněme. Co je proudění vzduchu? Jedná se o knihovnu (resp
Nyní se podívejme na hlavní entity Airflow. Po pochopení jejich podstaty a účelu budete optimálně organizovat architekturu procesu. Snad hlavní entitou je řízený acyklický graf (dále jen DAG).
DAG
DAG je nějaké sémantické spojení vašich úkolů, které chcete dokončit v přesně definovaném pořadí podle určitého plánu. Airflow představuje pohodlné webové rozhraní pro práci s DAG a dalšími entitami:
DAG může vypadat takto:
Při navrhování DAG vývojář stanoví sadu operátorů, na kterých budou postaveny úkoly v rámci DAG. Zde se dostáváme k další důležité entitě: Airflow Operator.
Operátoři
Operátor je entita, na jejímž základě se vytvářejí instance úlohy, která popisuje, co se bude dít během provádění instance úlohy.
- BashOperator je operátor pro provádění příkazu bash.
- PythonOperator je operátor pro volání kódu Python.
- EmailOperator – operátor pro odesílání emailů.
- HTTPOperator - operátor pro práci s http požadavky.
- SqlOperator je operátor pro provádění kódu SQL.
- Senzor je operátor pro čekání na událost (příchod požadovaného času, vzhled požadovaného souboru, řádek v databázi, odpověď z API atd. atd.).
Existují specifičtější operátoři: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Můžete také vyvinout operátory podle svých potřeb a použít je ve svém projektu. Vytvořili jsme například MongoDBToHiveViaHdfsTransfer, operátor pro export dokumentů z MongoDB do Hive, a několik operátorů pro práci s
Dále je třeba provést všechny tyto instance úloh a nyní budeme hovořit o plánovači.
Планировщик
Plánovač úloh v Airflow je postaven na
Každý fond má omezený počet slotů. Při vytváření DAG je mu přidělen fond:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Fond nastavený na úrovni DAG lze přepsat na úrovni úlohy.
Za plánování všech úloh v Airflow je zodpovědný samostatný proces, Scheduler. Plánovač se ve skutečnosti zabývá všemi mechanismy nastavování úkolů pro provádění. Úloha prochází několika fázemi, než je provedena:
- Předchozí úkoly byly v DAG dokončeny, nový lze zařadit do fronty.
- Fronta je řazena v závislosti na prioritě úkolů (priority lze také řídit), a pokud je ve fondu volný slot, lze úkol spustit.
- Pokud existuje volný dělník celer, úkol je odeslán na něj; začíná práce, kterou jste naprogramovali v úloze, pomocí jednoho nebo druhého operátoru.
Prostě jednoduché.
Plánovač běží na sadě všech DAG a všech úloh v rámci DAG.
Aby plánovač mohl začít pracovat s DAG, musí DAG nastavit plán:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
K dispozici je sada hotových předvoleb: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Můžete také použít cron výrazy:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Datum provedení
Abyste pochopili, jak Airflow funguje, je důležité pochopit, co je datum provedení pro DAG. Airflow DAG má dimenzi Execution Date, tj. v závislosti na pracovním plánu DAG jsou instance úkolů vytvářeny pro každé datum provedení. A pro každé Datum provedení lze úkoly znovu provést - nebo například DAG může pracovat současně v několika datech provedení. To je jasně ukázáno zde:
Bohužel (nebo možná naštěstí: záleží na situaci), pokud je implementace úkolu v DAG správná, pak provedení v předchozím Date Execution půjde s úpravami. To je dobré, pokud potřebujete přepočítat data v minulých obdobích pomocí nového algoritmu, ale je to špatné, protože se ztrácí reprodukovatelnost výsledku (nikdo se samozřejmě neobtěžuje vrátit požadovanou verzi zdrojového kódu z Gitu a vypočítat, co potřeba jednou, podle potřeby).
Generování úkolů
Implementace DAG je kód Python, takže máme velmi pohodlný způsob, jak snížit množství kódu při práci například se shardovanými zdroji. Předpokládejme, že máte jako zdroj tři úlomky MySQL, musíte do každého vlézt a sebrat nějaká data. A to nezávisle a paralelně. Kód Pythonu v DAG může vypadat takto:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG vypadá takto:
Zároveň můžete přidat nebo odebrat úlomek jednoduchou úpravou nastavení a aktualizací DAG. Komfortní!
Můžete také použít složitější generování kódu, například pracovat se zdroji ve formě databáze nebo popsat tabulkovou strukturu, algoritmus pro práci s tabulkou a s přihlédnutím k vlastnostem infrastruktury DWH vygenerovat proces načtení N tabulek do vašeho úložiště. Nebo například při práci s API, které nepodporuje práci s parametrem ve formě seznamu, můžete pomocí tohoto seznamu vygenerovat N úloh v DAG, omezit paralelismus požadavků v API na fond a extrahovat potřebná data z API. Flexibilní!
úložiště
Airflow má vlastní backendové úložiště, databázi (možná MySQL nebo Postgres, my máme Postgres), kde jsou uloženy stavy úkolů, DAGy, nastavení připojení, globální proměnné atd. atd. Zde bych chtěl říci, že úložiště in Airflow je velmi jednoduchý (asi 20 tabulek) a pohodlný, pokud na něm chcete postavit některý ze svých procesů. Pamatuji si 100500 XNUMX tabulek v úložišti Informatica, které se musely dlouho kouřit, než jsem pochopil, jak sestavit dotaz.
Sledování
Vzhledem k jednoduchosti úložiště můžete vytvořit proces monitorování úloh, který je pro vás vhodný. V Zeppelinu používáme poznámkový blok, kde se podíváme na stav úkolů:
Může to být také webové rozhraní samotného Airflow:
Kód proudění vzduchu je otevřený, proto jsme přidali upozornění v telegramu. Každá instance spuštěné úlohy, pokud dojde k chybě, posílá spam do skupiny Telegram, kde se skládá celý vývojový a podpůrný tým.
Dostaneme okamžitou odpověď prostřednictvím telegramu (pokud je to požadováno), prostřednictvím Zeppelinu - celkový obrázek o úkolech v Airflow.
Celkem
Airflow je především open source a nečekejte od něj zázraky. Buďte připraveni vynaložit čas a úsilí na vytvoření funkčního řešení. Cíl z kategorie dosažitelných, věřte, stojí to za to. Rychlost vývoje, flexibilita, snadné přidávání nových procesů – to si zamilujete. Samozřejmě je třeba věnovat velkou pozornost organizaci projektu, stabilitě práce samotného Airflow: neexistují žádné zázraky.
Nyní máme Airflow fungující denně asi 6,5 tisíce úkolů. Povahově se dost liší. Existují úkoly pro načítání dat do hlavního DWH z mnoha různých a velmi specifických zdrojů, existují úkoly pro výpočet výkladů uvnitř hlavního DWH, existují úkoly pro publikování dat do rychlého DWH, existuje mnoho, mnoho různých úkolů – a Airflow žvýká je celý den za dnem. Řečeno v číslech, je to tak 2,3 tisíc Úlohy ELT různé složitosti uvnitř DWH (Hadoop), o 2,5 stovky databází zdrojů, toto je příkaz z 4 ETL vývojáři, které se dělí na ETL zpracování dat v DWH a ELT zpracování dat v rámci DWH a samozřejmě další jeden admin, která se zabývá infrastrukturou služby.
Plány do budoucna
Počet procesů nevyhnutelně roste a to hlavní, co budeme dělat z hlediska infrastruktury Airflow, je škálování. Chceme vybudovat Airflow cluster, alokovat pár nohou pro pracovníky Celery a vytvořit duplicitní hlavu s procesy plánování úloh a úložištěm.
Epilog
To samozřejmě není zdaleka vše, co bych chtěl o Airflow mluvit, ale snažil jsem se zdůraznit hlavní body. S jídlem přichází chuť, vyzkoušejte a bude vám chutnat 🙂
Zdroj: www.habr.com