Ahoj Habr! V tomto článku chcem hovoriť o jednom skvelom nástroji na vývoj procesov dávkového spracovania dát, napríklad v infraštruktúre firemného DWH alebo vášho DataLake. Budeme hovoriť o Apache Airflow (ďalej len Airflow). Na Habré je nespravodlivo ochudobnený o pozornosť a v hlavnej časti sa vás pokúsim presvedčiť, že pri výbere plánovača pre vaše ETL/ELT procesy sa oplatí pozrieť aspoň Airflow.
Predtým som napísal sériu článkov na tému DWH, keď som pracoval v Tinkoff Bank. Teraz som sa stal súčasťou tímu Mail.Ru Group a vyvíjam platformu na analýzu údajov v oblasti hier. V skutočnosti, keď sa objavia novinky a zaujímavé riešenia, budeme tu s mojím tímom hovoriť o našej platforme na analýzu údajov.
prológ
Takže, začnime. Čo je prúdenie vzduchu? Toto je knižnica (resp
Teraz sa pozrime na hlavné entity Airflow. Pochopením ich podstaty a účelu môžete optimálne organizovať svoju procesnú architektúru. Možno hlavnou entitou je riadený acyklický graf (ďalej len DAG).
DAG
DAG je nejaké zmysluplné spojenie vašich úloh, ktoré chcete dokončiť v presne definovanom poradí podľa špecifického plánu. Airflow poskytuje pohodlné webové rozhranie pre prácu s DAG a inými entitami:
DAG môže vyzerať takto:
Vývojár pri navrhovaní DAG stanoví súbor operátorov, na ktorých budú postavené úlohy v rámci DAG. Tu sa dostávame k ďalšej dôležitej entite: Airflow Operator.
prevádzkovatelia
Operátor je entita, na základe ktorej sa vytvárajú inštancie úlohy, ktorá popisuje, čo sa stane počas vykonávania inštancie úlohy.
- BashOperator - operátor na vykonanie príkazu bash.
- PythonOperator - operátor na volanie kódu Python.
- EmailOperator — operátor na odosielanie e-mailov.
- HTTPOperator - operátor pre prácu s http požiadavkami.
- SqlOperator - operátor pre vykonávanie SQL kódu.
- Senzor je operátor čakania na udalosť (príchod požadovaného času, objavenie sa požadovaného súboru, riadok v databáze, odpoveď z API atď. atď.).
Existujú konkrétnejšie operátory: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Môžete tiež vyvinúť operátory na základe svojich vlastných charakteristík a použiť ich vo svojom projekte. Vytvorili sme napríklad MongoDBToHiveViaHdfsTransfer, operátor na export dokumentov z MongoDB do Hive, a niekoľko operátorov na prácu s
Ďalej je potrebné vykonať všetky tieto inštancie úloh a teraz budeme hovoriť o plánovači.
Plánovač
Plánovač úloh Airflow je postavený na
Každý bazén má limit na počet slotov. Pri vytváraní DAG sa mu pridelí bazén:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Skupinu definovanú na úrovni DAG možno prepísať na úrovni úlohy.
Za plánovanie všetkých úloh v Airflow je zodpovedný samostatný proces, Scheduler. V skutočnosti sa Plánovač zaoberá všetkými mechanizmami nastavovania úloh na vykonanie. Úloha pred vykonaním prechádza niekoľkými fázami:
- Predchádzajúce úlohy boli v DAG dokončené, je možné zaradiť novú.
- Front je triedený v závislosti od priority úloh (priority môžu byť tiež kontrolované) a ak je v poole voľný slot, môže byť úloha prevzatá do prevádzky.
- Ak existuje voľný robotnícky zeler, úloha sa odošle naň; začína práca, ktorú ste naprogramovali v probléme, pomocou jedného alebo druhého operátora.
Dosť jednoduché.
Plánovač beží na množine všetkých DAG a všetkých úloh v rámci DAG.
Aby Plánovač mohol začať pracovať s DAG, musí DAG nastaviť plán:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
K dispozícii je sada hotových predvolieb: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Môžete tiež použiť cron výrazy:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Dátum vykonania
Aby ste pochopili, ako Airflow funguje, je dôležité pochopiť, čo je dátum vykonania pre DAG. V Airflow má DAG dimenziu Dátum vykonania, t. j. v závislosti od pracovného plánu DAG sa inštancie úloh vytvárajú pre každý Dátum vykonania. A pre každý Dátum vykonania je možné úlohy znova vykonať - alebo napríklad DAG môže pracovať súčasne v niekoľkých dátumoch vykonania. Tu je to jasne znázornené:
Bohužiaľ (alebo možno našťastie: závisí to od situácie), ak sa opraví implementácia úlohy v DAG, bude vykonanie pokračovať v predchádzajúcom Dátume vykonania s prihliadnutím na úpravy. Je to dobré, ak potrebujete prepočítať údaje v minulých obdobiach pomocou nového algoritmu, ale je to zlé, pretože sa stratí reprodukovateľnosť výsledku (samozrejme, nikto vás neobťažuje vrátiť požadovanú verziu zdrojového kódu z Gitu a vypočítať, čo potrebujete raz, tak, ako to potrebujete).
Generovanie úloh
Implementácia DAG je kód v Pythone, takže máme veľmi pohodlný spôsob, ako znížiť množstvo kódu pri práci napríklad so shardenými zdrojmi. Povedzme, že máte tri úlomky MySQL ako zdroj, do každého musíte vliezť a vybrať nejaké údaje. Navyše nezávisle a paralelne. Kód Pythonu v DAG môže vyzerať takto:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG vyzerá takto:
V tomto prípade môžete pridať alebo odstrániť zlomok jednoduchou úpravou nastavení a aktualizáciou DAG. Pohodlné!
Môžete použiť aj zložitejšie generovanie kódu, napríklad pracovať so zdrojmi vo forme databázy alebo opísať štruktúru tabuľky, algoritmus na prácu s tabuľkou a s prihliadnutím na vlastnosti infraštruktúry DWH vygenerovať proces na načítanie N tabuliek do vášho úložiska. Alebo napríklad pri práci s API, ktoré nepodporuje prácu s parametrom vo forme zoznamu, môžete vygenerovať N úloh v DAG z tohto zoznamu, obmedziť paralelnosť požiadaviek v API na fond a zoškrabať potrebné údaje z API. Flexibilné!
Úložisko
Airflow má svoje vlastné backend úložisko, databázu (môže byť MySQL alebo Postgres, my máme Postgres), v ktorej sú uložené stavy úloh, DAG, nastavenia pripojenia, globálne premenné atď., atď. Tu by som rád povedal, že úložisko v Airflow je veľmi jednoduché (asi 20 tabuliek) a pohodlné, ak si na ňom chcete vybudovať akýkoľvek vlastný proces. Pamätám si 100500 XNUMX tabuliek v úložisku Informatica, ktoré bolo potrebné dlho študovať, kým som pochopil, ako zostaviť dotaz.
monitorovanie
Vzhľadom na jednoduchosť úložiska si môžete vytvoriť proces monitorovania úloh, ktorý vám vyhovuje. V Zeppelin používame poznámkový blok, kde sa pozeráme na stav úloh:
Môže to byť aj samotné webové rozhranie Airflow:
Kód Airflow je open source, preto sme do telegramu pridali upozornenia. Každá spustená inštancia úlohy, ak sa vyskytne chyba, spamuje skupinu v telegrame, kde sa skladá celý vývojový a podporný tím.
Dostávame rýchlu odpoveď prostredníctvom telegramu (ak je to potrebné) a prostredníctvom Zeppelinu dostávame celkový obraz o úlohách v Airflow.
Celkom
Airflow je primárne open source a nemali by ste od neho očakávať zázraky. Buďte pripravení vynaložiť čas a úsilie na vytvorenie fungujúceho riešenia. Cieľ je dosiahnuteľný, verte mi, stojí to za to. Rýchlosť vývoja, flexibilita, jednoduchosť pridávania nových procesov - to sa vám bude páčiť. Samozrejme, musíte venovať veľkú pozornosť organizácii projektu, stabilite samotného Airflow: zázraky sa nedejú.
Teraz nám Airflow funguje každý deň asi 6,5 tisíc úloh. Povahovo sú dosť rozdielne. Existujú úlohy načítania údajov do hlavného DWH z mnohých rôznych a veľmi špecifických zdrojov, existujú úlohy výpočtu výkladov v hlavnom DWH, existujú úlohy publikovania údajov do rýchleho DWH, existuje veľa, veľa rôznych úloh - a prúdenie vzduchu prežúva ich všetky deň čo deň. V číslach je to tak 2,3 tisíc Úlohy ELT rôznej zložitosti v rámci DWH (Hadoop), cca. 2,5 stoviek databáz zdrojov, toto je tím z 4 ETL vývojári, ktoré sa delia na ETL spracovanie dát v DWH a ELT spracovanie dát vo vnútri DWH a samozrejme ďalšie jeden admin, ktorý sa zaoberá infraštruktúrou služby.
Plány do budúcnosti
Počet procesov nevyhnutne rastie a hlavnou vecou, ktorú budeme robiť v súvislosti s infraštruktúrou Airflow, je škálovanie. Chceme vybudovať klaster Airflow, prideliť pár nôh pracovníkom Celery a vytvoriť samoduplikujúcu hlavu s procesmi plánovania úloh a úložiskom.
Epilóg
Toto, samozrejme, nie je všetko, čo by som chcel povedať o Airflow, ale pokúsil som sa zdôrazniť hlavné body. K jedeniu patrí aj chuť, vyskúšajte a bude vám chutiť :)
Zdroj: hab.com