Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Ahoj Habr! V tomto článku chcem hovoriť o jednom skvelom nástroji na vývoj procesov dávkového spracovania dát, napríklad v infraštruktúre firemného DWH alebo vášho DataLake. Budeme hovoriť o Apache Airflow (ďalej len Airflow). Na Habré je nespravodlivo ochudobnený o pozornosť a v hlavnej časti sa vás pokúsim presvedčiť, že pri výbere plánovača pre vaše ETL/ELT procesy sa oplatí pozrieť aspoň Airflow.

Predtým som napísal sériu článkov na tému DWH, keď som pracoval v Tinkoff Bank. Teraz som sa stal súčasťou tímu Mail.Ru Group a vyvíjam platformu na analýzu údajov v oblasti hier. V skutočnosti, keď sa objavia novinky a zaujímavé riešenia, budeme tu s mojím tímom hovoriť o našej platforme na analýzu údajov.

prológ

Takže, začnime. Čo je prúdenie vzduchu? Toto je knižnica (resp súbor knižníc) rozvíjať, plánovať a monitorovať pracovné procesy. Hlavná vlastnosť Airflow: Python kód sa používa na popis (vývoj) procesov. To má veľa výhod pre organizáciu vášho projektu a vývoj: váš (napríklad) ETL projekt je v podstate len projektom Pythonu a môžete si ho organizovať, ako chcete, berúc do úvahy špecifiká infraštruktúry, veľkosť tímu a iné požiadavky. Inštrumentálne je všetko jednoduché. Použite napríklad PyCharm + Git. Je to úžasné a veľmi pohodlné!

Teraz sa pozrime na hlavné entity Airflow. Pochopením ich podstaty a účelu môžete optimálne organizovať svoju procesnú architektúru. Možno hlavnou entitou je riadený acyklický graf (ďalej len DAG).

DAG

DAG je nejaké zmysluplné spojenie vašich úloh, ktoré chcete dokončiť v presne definovanom poradí podľa špecifického plánu. Airflow poskytuje pohodlné webové rozhranie pre prácu s DAG a inými entitami:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

DAG môže vyzerať takto:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Vývojár pri navrhovaní DAG stanoví súbor operátorov, na ktorých budú postavené úlohy v rámci DAG. Tu sa dostávame k ďalšej dôležitej entite: Airflow Operator.

prevádzkovatelia

Operátor je entita, na základe ktorej sa vytvárajú inštancie úlohy, ktorá popisuje, čo sa stane počas vykonávania inštancie úlohy. Uvoľňuje prúdenie vzduchu z GitHubu už obsahujú sadu operátorov pripravených na použitie. Príklady:

  • BashOperator - operátor na vykonanie príkazu bash.
  • PythonOperator - operátor na volanie kódu Python.
  • EmailOperator — operátor na odosielanie e-mailov.
  • HTTPOperator - operátor pre prácu s http požiadavkami.
  • SqlOperator - operátor pre vykonávanie SQL kódu.
  • Senzor je operátor čakania na udalosť (príchod požadovaného času, objavenie sa požadovaného súboru, riadok v databáze, odpoveď z API atď. atď.).

Existujú konkrétnejšie operátory: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Môžete tiež vyvinúť operátory na základe svojich vlastných charakteristík a použiť ich vo svojom projekte. Vytvorili sme napríklad MongoDBToHiveViaHdfsTransfer, operátor na export dokumentov z MongoDB do Hive, a niekoľko operátorov na prácu s clickhouse: CHLoadFromHiveOperator a CHTableLoaderOperator. V podstate, akonáhle má projekt často používaný kód založený na základných príkazoch, môžete uvažovať o jeho začlenení do nového príkazu. To zjednoduší ďalší vývoj a rozšírite si knižnicu operátorov v projekte.

Ďalej je potrebné vykonať všetky tieto inštancie úloh a teraz budeme hovoriť o plánovači.

Plánovač

Plánovač úloh Airflow je postavený na Zeler. Celery je knižnica Pythonu, ktorá vám umožňuje organizovať front plus asynchrónne a distribuované vykonávanie úloh. Na strane prúdenia vzduchu sú všetky úlohy rozdelené do skupín. Bazény sa vytvárajú ručne. Ich účelom je zvyčajne obmedziť pracovné zaťaženie pri práci so zdrojom alebo typizovať úlohy v rámci DWH. Bazény je možné spravovať cez webové rozhranie:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Každý bazén má limit na počet slotov. Pri vytváraní DAG sa mu pridelí bazén:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Skupinu definovanú na úrovni DAG možno prepísať na úrovni úlohy.
Za plánovanie všetkých úloh v Airflow je zodpovedný samostatný proces, Scheduler. V skutočnosti sa Plánovač zaoberá všetkými mechanizmami nastavovania úloh na vykonanie. Úloha pred vykonaním prechádza niekoľkými fázami:

  1. Predchádzajúce úlohy boli v DAG dokončené, je možné zaradiť novú.
  2. Front je triedený v závislosti od priority úloh (priority môžu byť tiež kontrolované) a ak je v poole voľný slot, môže byť úloha prevzatá do prevádzky.
  3. Ak existuje voľný robotnícky zeler, úloha sa odošle naň; začína práca, ktorú ste naprogramovali v probléme, pomocou jedného alebo druhého operátora.

Dosť jednoduché.

Plánovač beží na množine všetkých DAG a všetkých úloh v rámci DAG.

Aby Plánovač mohol začať pracovať s DAG, musí DAG nastaviť plán:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

K dispozícii je sada hotových predvolieb: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Môžete tiež použiť cron výrazy:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dátum vykonania

Aby ste pochopili, ako Airflow funguje, je dôležité pochopiť, čo je dátum vykonania pre DAG. V Airflow má DAG dimenziu Dátum vykonania, t. j. v závislosti od pracovného plánu DAG sa inštancie úloh vytvárajú pre každý Dátum vykonania. A pre každý Dátum vykonania je možné úlohy znova vykonať - alebo napríklad DAG môže pracovať súčasne v niekoľkých dátumoch vykonania. Tu je to jasne znázornené:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Bohužiaľ (alebo možno našťastie: závisí to od situácie), ak sa opraví implementácia úlohy v DAG, bude vykonanie pokračovať v predchádzajúcom Dátume vykonania s prihliadnutím na úpravy. Je to dobré, ak potrebujete prepočítať údaje v minulých obdobiach pomocou nového algoritmu, ale je to zlé, pretože sa stratí reprodukovateľnosť výsledku (samozrejme, nikto vás neobťažuje vrátiť požadovanú verziu zdrojového kódu z Gitu a vypočítať, čo potrebujete raz, tak, ako to potrebujete).

Generovanie úloh

Implementácia DAG je kód v Pythone, takže máme veľmi pohodlný spôsob, ako znížiť množstvo kódu pri práci napríklad so shardenými zdrojmi. Povedzme, že máte tri úlomky MySQL ako zdroj, do každého musíte vliezť a vybrať nejaké údaje. Navyše nezávisle a paralelne. Kód Pythonu v DAG môže vyzerať takto:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG vyzerá takto:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

V tomto prípade môžete pridať alebo odstrániť zlomok jednoduchou úpravou nastavení a aktualizáciou DAG. Pohodlné!

Môžete použiť aj zložitejšie generovanie kódu, napríklad pracovať so zdrojmi vo forme databázy alebo opísať štruktúru tabuľky, algoritmus na prácu s tabuľkou a s prihliadnutím na vlastnosti infraštruktúry DWH vygenerovať proces na načítanie N tabuliek do vášho úložiska. Alebo napríklad pri práci s API, ktoré nepodporuje prácu s parametrom vo forme zoznamu, môžete vygenerovať N úloh v DAG z tohto zoznamu, obmedziť paralelnosť požiadaviek v API na fond a zoškrabať potrebné údaje z API. Flexibilné!

Úložisko

Airflow má svoje vlastné backend úložisko, databázu (môže byť MySQL alebo Postgres, my máme Postgres), v ktorej sú uložené stavy úloh, DAG, nastavenia pripojenia, globálne premenné atď., atď. Tu by som rád povedal, že úložisko v Airflow je veľmi jednoduché (asi 20 tabuliek) a pohodlné, ak si na ňom chcete vybudovať akýkoľvek vlastný proces. Pamätám si 100500 XNUMX tabuliek v úložisku Informatica, ktoré bolo potrebné dlho študovať, kým som pochopil, ako zostaviť dotaz.

monitorovanie

Vzhľadom na jednoduchosť úložiska si môžete vytvoriť proces monitorovania úloh, ktorý vám vyhovuje. V Zeppelin používame poznámkový blok, kde sa pozeráme na stav úloh:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Môže to byť aj samotné webové rozhranie Airflow:

Airflow je nástroj na pohodlný a rýchly vývoj a udržiavanie procesov dávkového spracovania dát

Kód Airflow je open source, preto sme do telegramu pridali upozornenia. Každá spustená inštancia úlohy, ak sa vyskytne chyba, spamuje skupinu v telegrame, kde sa skladá celý vývojový a podporný tím.

Dostávame rýchlu odpoveď prostredníctvom telegramu (ak je to potrebné) a prostredníctvom Zeppelinu dostávame celkový obraz o úlohách v Airflow.

Celkom

Airflow je primárne open source a nemali by ste od neho očakávať zázraky. Buďte pripravení vynaložiť čas a úsilie na vytvorenie fungujúceho riešenia. Cieľ je dosiahnuteľný, verte mi, stojí to za to. Rýchlosť vývoja, flexibilita, jednoduchosť pridávania nových procesov - to sa vám bude páčiť. Samozrejme, musíte venovať veľkú pozornosť organizácii projektu, stabilite samotného Airflow: zázraky sa nedejú.

Teraz nám Airflow funguje každý deň asi 6,5 tisíc úloh. Povahovo sú dosť rozdielne. Existujú úlohy načítania údajov do hlavného DWH z mnohých rôznych a veľmi špecifických zdrojov, existujú úlohy výpočtu výkladov v hlavnom DWH, existujú úlohy publikovania údajov do rýchleho DWH, existuje veľa, veľa rôznych úloh - a prúdenie vzduchu prežúva ich všetky deň čo deň. V číslach je to tak 2,3 tisíc Úlohy ELT rôznej zložitosti v rámci DWH (Hadoop), cca. 2,5 stoviek databáz zdrojov, toto je tím z 4 ETL vývojári, ktoré sa delia na ETL spracovanie dát v DWH a ELT spracovanie dát vo vnútri DWH a samozrejme ďalšie jeden admin, ktorý sa zaoberá infraštruktúrou služby.

Plány do budúcnosti

Počet procesov nevyhnutne rastie a hlavnou vecou, ​​ktorú budeme robiť v súvislosti s infraštruktúrou Airflow, je škálovanie. Chceme vybudovať klaster Airflow, prideliť pár nôh pracovníkom Celery a vytvoriť samoduplikujúcu hlavu s procesmi plánovania úloh a úložiskom.

Epilóg

Toto, samozrejme, nie je všetko, čo by som chcel povedať o Airflow, ale pokúsil som sa zdôrazniť hlavné body. K jedeniu patrí aj chuť, vyskúšajte a bude vám chutiť :)

Zdroj: hab.com

Pridať komentár