Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Szia Habr! Ebben a cikkben egy nagyszerű eszközről szeretnék beszélni a kötegelt adatfeldolgozási folyamatok fejlesztéséhez, például egy vállalati DWH vagy DataLake infrastruktúrájában. Szó lesz az Apache Airflow-ról (a továbbiakban Airflow). Igazságtalanul figyelmen kívül hagyják a Habré-t, és a fő részben megpróbálom meggyőzni, hogy legalább az Airflow-ra érdemes figyelni az ETL/ELT folyamatok ütemezőjének kiválasztásakor.

Korábban cikksorozatot írtam a DWH témában, amikor a Tinkoff Banknál dolgoztam. Mostantól a Mail.Ru Group csapatának tagja lettem, és egy platformot fejlesztek az adatelemzésre a játék területén. Valójában amint megjelennek a hírek és az érdekes megoldások, csapatommal itt fogunk beszélni az adatelemzési platformunkról.

prológus

Szóval, kezdjük. Mi az a légáramlás? Ez egy könyvtár (vagy könyvtárak halmaza) munkafolyamatok fejlesztésére, tervezésére és nyomon követésére. Az Airflow fő jellemzője: A Python kód a folyamatok leírására (fejlesztésére) szolgál. Ennek nagyon sok előnye van a projekt és a fejlesztés megszervezésében: lényegében az Ön (például) ETL projektje csak egy Python projekt, és tetszés szerint szervezheti, figyelembe véve az infrastruktúra sajátosságait, a csapat méretét, ill. egyéb követelmények. Hangszeresen minden egyszerű. Használja például a PyCharm + Git. Csodálatos és nagyon kényelmes!

Most pedig nézzük meg az Airflow fő entitásait. A lényegük és céljuk megértésével optimálisan megszervezheti folyamatarchitektúráját. Talán a fő entitás az irányított aciklikus gráf (a továbbiakban: DAG).

DAG

A DAG a feladatok néhány értelmes társítása, amelyet szigorúan meghatározott sorrendben, meghatározott ütemezés szerint szeretne végrehajtani. Az Airflow kényelmes webes felületet biztosít a DAG-okkal és más entitásokkal való munkához:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

A DAG így nézhet ki:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

A fejlesztő a DAG tervezésekor meghatározza azokat az operátorokat, amelyekre a DAG-on belüli feladatok épülnek. Itt elérkeztünk egy másik fontos entitáshoz: az Airflow Operatorhoz.

Üzemeltetők

Az operátor egy entitás, amely alapján a jobpéldányok létrejönnek, és leírja, hogy mi fog történni egy jobpéldány végrehajtása során. Airflow kiadások a GitHubból már tartalmaznak használatra kész operátorkészletet. Példák:

  • BashOperator - operátor a bash parancs végrehajtásához.
  • PythonOperator - operátor a Python kód meghívására.
  • EmailOperator – e-mail küldő operátor.
  • HTTPOperator – operátor a http-kérések kezeléséhez.
  • SqlOperator - operátor az SQL kód végrehajtásához.
  • A szenzor egy eseményre váró operátor (a szükséges idő megérkezése, a szükséges fájl megjelenése, egy sor az adatbázisban, válasz az API-tól stb. stb.).

Vannak konkrétabb operátorok: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Operátorokat is fejleszthet saját jellemzői alapján, és felhasználhatja őket a projektben. Például létrehoztuk a MongoDBToHiveViaHdfsTransfer operátort, amely a dokumentumok MongoDB-ből Hive-ba exportálására szolgál, valamint számos operátort a Kattintson a Ház gombra: CHLoadFromHiveOperator és CHTableLoaderOperator. Lényegében, amint egy projekt gyakran használt alapvető utasításokra épülő kódot, elgondolkodhat azon, hogy azt egy új utasításba építse be. Ez leegyszerűsíti a további fejlesztést, és bővíti a projektben szereplő operátorok könyvtárát.

Ezután a feladatok mindegyik példányát végre kell hajtani, és most az ütemezőről fogunk beszélni.

Ütemező

Az Airflow feladatütemezője erre épül Zeller. A Celery egy Python-könyvtár, amely lehetővé teszi a várólista megszervezését, valamint a feladatok aszinkron és elosztott végrehajtását. Az Airflow oldalon minden feladat medencékre van felosztva. A medencék manuálisan jönnek létre. Jellemzően az a céljuk, hogy korlátozzák a forrással végzett munka terhelését, vagy a DWH-n belüli feladatokat tipizálják. A medencék a webes felületen keresztül kezelhetők:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Minden készletben korlátozott a slotok száma. A DAG létrehozásakor egy készletet kap:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

A DAG szintjén meghatározott készlet felülbírálható a feladat szintjén.
Egy külön folyamat, a Scheduler felelős az Airflow összes feladatának ütemezéséért. Valójában az ütemező foglalkozik a végrehajtási feladatok beállításának minden mechanikájával. A feladat végrehajtása előtt több szakaszon megy keresztül:

  1. A DAG-ban a korábbi feladatok elkészültek, újat lehet sorba állítani.
  2. A sor a feladatok prioritása szerint rendeződik (a prioritások is szabályozhatók), és ha van szabad hely a készletben, akkor a feladat üzembe helyezhető.
  3. Ha van szabad munkás zeller, a feladat elküldésre kerül; kezdődik a feladatban programozott munka, egyik vagy másik operátor használatával.

Elég egyszerű.

Az ütemező az összes DAG halmazán és a DAG-okon belüli összes feladaton fut.

Ahhoz, hogy az Ütemező elkezdjen dolgozni a DAG-val, a DAG-nak be kell állítania egy ütemezést:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Van egy sor kész előbeállítás: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Használhat cron kifejezéseket is:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Végrehajtás időpontja

Az Airflow működésének megértéséhez fontos megérteni, hogy mi a végrehajtási dátum egy DAG esetében. Az Airflow-ban a DAG végrehajtási dátum dimenzióval rendelkezik, azaz a DAG munkaütemezésétől függően minden végrehajtási dátumhoz feladatpéldányok jönnek létre. És minden végrehajtási dátumnál a feladatok újra végrehajthatók - vagy például egy DAG dolgozhat egyszerre több végrehajtási napon. Ez itt jól látható:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Sajnos (vagy talán szerencsére: helyzetfüggő), ha a feladat végrehajtása a DAG-ban korrigálásra kerül, akkor az előző Végrehajtási Dátumban történő végrehajtás a kiigazítások figyelembevételével folytatódik. Ez akkor jó, ha az elmúlt időszakok adatait új algoritmussal kell újraszámolni, de rossz, mert az eredmény reprodukálhatósága elvész (persze senki nem zavar, hogy visszaadja a forráskód szükséges verzióját a Gitből, és kiszámolja, hogy mit egyszer kell, úgy, ahogyan szüksége van rá).

Feladatok generálása

A DAG megvalósítása Python kód, így nagyon kényelmes módunk van a kód mennyiségének csökkentésére, ha például szilánkos forrásokkal dolgozunk. Tegyük fel, hogy három MySQL-szilánk van forrásként, mindegyikbe be kell másznia, és fel kell vennie néhány adatot. Ráadásul önállóan és párhuzamosan. A DAG-ban lévő Python-kód így nézhet ki:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

A DAG így néz ki:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Ebben az esetben egyszerűen hozzáadhat vagy eltávolíthat egy szilánkot a beállítások módosításával és a DAG frissítésével. Kényelmes!

Használhat bonyolultabb kódgenerálást is, például dolgozhat a forrásokkal adatbázis formájában, vagy írhat le egy táblaszerkezetet, egy táblával való munkavégzésre szolgáló algoritmust, és a DWH infrastruktúra jellemzőit figyelembe véve folyamatot generálhat. N tábla betöltéséhez a tárolóba. Vagy például, ha olyan API-val dolgozik, amely nem támogatja a lista formájú paraméterekkel való munkát, ebből a listából N feladatot generálhat egy DAG-ban, korlátozhatja a kérések párhuzamosságát az API-ban egy készletre, és a szükséges adatokat az API-ból. Rugalmas!

adattár

Az Airflow-nak van saját háttértára, adatbázisa (lehet MySQL vagy Postgres, nálunk Postgres van), ami tárolja a feladatok állapotát, DAG-okat, kapcsolati beállításokat, globális változókat, stb stb. Itt szeretném elmondani, hogy a A repository az Airflow-ban nagyon egyszerű (körülbelül 20 tábla), és kényelmes, ha saját folyamatait szeretné ráépíteni. Emlékszem az Informatica repository 100500 XNUMX táblájára, amelyeket sokáig kellett tanulmányozni, mielőtt megértettem, hogyan kell felépíteni egy lekérdezést.

megfigyelés

Tekintettel az adattár egyszerűségére, olyan feladatmegfigyelési folyamatot hozhat létre, amely kényelmes az Ön számára. A Zeppelinben egy jegyzettömböt használunk, ahol megnézzük a feladatok állapotát:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Ez lehet magának az Airflow webes felülete is:

Az Airflow egy eszköz a kötegelt adatfeldolgozási folyamatok kényelmes és gyors fejlesztéséhez és karbantartásához

Az Airflow kód nyílt forráskódú, ezért riasztást adtunk a Telegramhoz. Egy feladat minden futó példánya, ha hiba történik, spamet küld a Telegram csoportba, ahol a teljes fejlesztői és támogatási csapat áll.

A Telegramon (ha szükséges) azonnali választ kapunk, a Zeppelinen keresztül pedig átfogó képet kapunk az Airflow feladatairól.

Összességében

Az Airflow elsősorban nyílt forráskódú, és nem szabad tőle csodát várni. Készüljön fel arra, hogy időt és energiát fordítson egy működő megoldás kidolgozására. A cél elérhető, hidd el, megéri. Fejlesztési sebesség, rugalmasság, új folyamatok hozzáadásának egyszerűsége – tetszeni fog. Természetesen nagyon oda kell figyelni a projekt megszervezésére, magának az Airflow stabilitásának: csodák nem történnek.

Most az Airflow naponta dolgozik mintegy 6,5 ezer feladatot. Jellemükben egészen eltérőek. Vannak adatok betöltése a fő DWH-ba sok különböző és nagyon specifikus forrásból, vannak a fő DWH-n belüli kirakatszámítási feladatok, vannak adatok gyors DWH-ba való közzétételének feladatai, sok-sok különböző feladat - és az Airflow megrágja őket nap mint nap. Ha számokban beszélünk, ez így van 2,3 ezer Változó bonyolultságú ELT feladatok a DWH-n belül (Hadoop), kb. 2,5 száz adatbázis forrásokból származik, ez egy csapat 4 ETL fejlesztő, amelyek fel vannak osztva ETL adatfeldolgozásra a DWH-ban és ELT adatfeldolgozásra a DWH-n belül és természetesen többre egy adminisztrátor, aki a szolgáltatás infrastruktúrájával foglalkozik.

Tervek a jövőre

A folyamatok száma elkerülhetetlenül növekszik, és az Airflow infrastruktúráját illetően a legfontosabb, amit tenni fogunk, a méretezés. Létre akarunk építeni egy Airflow klasztert, kiosztunk egy pár lábat a Celery dolgozóinak, és egy önmásoló fejet szeretnénk készíteni munkaütemezési folyamatokkal és adattárral.

Epilógus

Ez persze nem minden, amit az Airflow-ról szeretnék elmondani, de a főbb pontokat igyekeztem kiemelni. Evés közben megjön az étvágy, próbáld ki és tetszeni fog :)

Forrás: will.com

Hozzászólás