Szia Habr! Ebben a cikkben egy nagyszerű eszközről szeretnék beszélni a kötegelt adatfeldolgozási folyamatok fejlesztéséhez, például egy vállalati DWH vagy DataLake infrastruktúrájában. Szó lesz az Apache Airflow-ról (a továbbiakban Airflow). Igazságtalanul figyelmen kívül hagyják a Habré-t, és a fő részben megpróbálom meggyőzni, hogy legalább az Airflow-ra érdemes figyelni az ETL/ELT folyamatok ütemezőjének kiválasztásakor.
Korábban cikksorozatot írtam a DWH témában, amikor a Tinkoff Banknál dolgoztam. Mostantól a Mail.Ru Group csapatának tagja lettem, és egy platformot fejlesztek az adatelemzésre a játék területén. Valójában amint megjelennek a hírek és az érdekes megoldások, csapatommal itt fogunk beszélni az adatelemzési platformunkról.
prológus
Szóval, kezdjük. Mi az a légáramlás? Ez egy könyvtár (vagy
Most pedig nézzük meg az Airflow fő entitásait. A lényegük és céljuk megértésével optimálisan megszervezheti folyamatarchitektúráját. Talán a fő entitás az irányított aciklikus gráf (a továbbiakban: DAG).
DAG
A DAG a feladatok néhány értelmes társítása, amelyet szigorúan meghatározott sorrendben, meghatározott ütemezés szerint szeretne végrehajtani. Az Airflow kényelmes webes felületet biztosít a DAG-okkal és más entitásokkal való munkához:
A DAG így nézhet ki:
A fejlesztő a DAG tervezésekor meghatározza azokat az operátorokat, amelyekre a DAG-on belüli feladatok épülnek. Itt elérkeztünk egy másik fontos entitáshoz: az Airflow Operatorhoz.
Üzemeltetők
Az operátor egy entitás, amely alapján a jobpéldányok létrejönnek, és leírja, hogy mi fog történni egy jobpéldány végrehajtása során.
- BashOperator - operátor a bash parancs végrehajtásához.
- PythonOperator - operátor a Python kód meghívására.
- EmailOperator – e-mail küldő operátor.
- HTTPOperator – operátor a http-kérések kezeléséhez.
- SqlOperator - operátor az SQL kód végrehajtásához.
- A szenzor egy eseményre váró operátor (a szükséges idő megérkezése, a szükséges fájl megjelenése, egy sor az adatbázisban, válasz az API-tól stb. stb.).
Vannak konkrétabb operátorok: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Operátorokat is fejleszthet saját jellemzői alapján, és felhasználhatja őket a projektben. Például létrehoztuk a MongoDBToHiveViaHdfsTransfer operátort, amely a dokumentumok MongoDB-ből Hive-ba exportálására szolgál, valamint számos operátort a
Ezután a feladatok mindegyik példányát végre kell hajtani, és most az ütemezőről fogunk beszélni.
Ütemező
Az Airflow feladatütemezője erre épül
Minden készletben korlátozott a slotok száma. A DAG létrehozásakor egy készletet kap:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
A DAG szintjén meghatározott készlet felülbírálható a feladat szintjén.
Egy külön folyamat, a Scheduler felelős az Airflow összes feladatának ütemezéséért. Valójában az ütemező foglalkozik a végrehajtási feladatok beállításának minden mechanikájával. A feladat végrehajtása előtt több szakaszon megy keresztül:
- A DAG-ban a korábbi feladatok elkészültek, újat lehet sorba állítani.
- A sor a feladatok prioritása szerint rendeződik (a prioritások is szabályozhatók), és ha van szabad hely a készletben, akkor a feladat üzembe helyezhető.
- Ha van szabad munkás zeller, a feladat elküldésre kerül; kezdődik a feladatban programozott munka, egyik vagy másik operátor használatával.
Elég egyszerű.
Az ütemező az összes DAG halmazán és a DAG-okon belüli összes feladaton fut.
Ahhoz, hogy az Ütemező elkezdjen dolgozni a DAG-val, a DAG-nak be kell állítania egy ütemezést:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Van egy sor kész előbeállítás: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Használhat cron kifejezéseket is:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Végrehajtás időpontja
Az Airflow működésének megértéséhez fontos megérteni, hogy mi a végrehajtási dátum egy DAG esetében. Az Airflow-ban a DAG végrehajtási dátum dimenzióval rendelkezik, azaz a DAG munkaütemezésétől függően minden végrehajtási dátumhoz feladatpéldányok jönnek létre. És minden végrehajtási dátumnál a feladatok újra végrehajthatók - vagy például egy DAG dolgozhat egyszerre több végrehajtási napon. Ez itt jól látható:
Sajnos (vagy talán szerencsére: helyzetfüggő), ha a feladat végrehajtása a DAG-ban korrigálásra kerül, akkor az előző Végrehajtási Dátumban történő végrehajtás a kiigazítások figyelembevételével folytatódik. Ez akkor jó, ha az elmúlt időszakok adatait új algoritmussal kell újraszámolni, de rossz, mert az eredmény reprodukálhatósága elvész (persze senki nem zavar, hogy visszaadja a forráskód szükséges verzióját a Gitből, és kiszámolja, hogy mit egyszer kell, úgy, ahogyan szüksége van rá).
Feladatok generálása
A DAG megvalósítása Python kód, így nagyon kényelmes módunk van a kód mennyiségének csökkentésére, ha például szilánkos forrásokkal dolgozunk. Tegyük fel, hogy három MySQL-szilánk van forrásként, mindegyikbe be kell másznia, és fel kell vennie néhány adatot. Ráadásul önállóan és párhuzamosan. A DAG-ban lévő Python-kód így nézhet ki:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
A DAG így néz ki:
Ebben az esetben egyszerűen hozzáadhat vagy eltávolíthat egy szilánkot a beállítások módosításával és a DAG frissítésével. Kényelmes!
Használhat bonyolultabb kódgenerálást is, például dolgozhat a forrásokkal adatbázis formájában, vagy írhat le egy táblaszerkezetet, egy táblával való munkavégzésre szolgáló algoritmust, és a DWH infrastruktúra jellemzőit figyelembe véve folyamatot generálhat. N tábla betöltéséhez a tárolóba. Vagy például, ha olyan API-val dolgozik, amely nem támogatja a lista formájú paraméterekkel való munkát, ebből a listából N feladatot generálhat egy DAG-ban, korlátozhatja a kérések párhuzamosságát az API-ban egy készletre, és a szükséges adatokat az API-ból. Rugalmas!
adattár
Az Airflow-nak van saját háttértára, adatbázisa (lehet MySQL vagy Postgres, nálunk Postgres van), ami tárolja a feladatok állapotát, DAG-okat, kapcsolati beállításokat, globális változókat, stb stb. Itt szeretném elmondani, hogy a A repository az Airflow-ban nagyon egyszerű (körülbelül 20 tábla), és kényelmes, ha saját folyamatait szeretné ráépíteni. Emlékszem az Informatica repository 100500 XNUMX táblájára, amelyeket sokáig kellett tanulmányozni, mielőtt megértettem, hogyan kell felépíteni egy lekérdezést.
megfigyelés
Tekintettel az adattár egyszerűségére, olyan feladatmegfigyelési folyamatot hozhat létre, amely kényelmes az Ön számára. A Zeppelinben egy jegyzettömböt használunk, ahol megnézzük a feladatok állapotát:
Ez lehet magának az Airflow webes felülete is:
Az Airflow kód nyílt forráskódú, ezért riasztást adtunk a Telegramhoz. Egy feladat minden futó példánya, ha hiba történik, spamet küld a Telegram csoportba, ahol a teljes fejlesztői és támogatási csapat áll.
A Telegramon (ha szükséges) azonnali választ kapunk, a Zeppelinen keresztül pedig átfogó képet kapunk az Airflow feladatairól.
Összességében
Az Airflow elsősorban nyílt forráskódú, és nem szabad tőle csodát várni. Készüljön fel arra, hogy időt és energiát fordítson egy működő megoldás kidolgozására. A cél elérhető, hidd el, megéri. Fejlesztési sebesség, rugalmasság, új folyamatok hozzáadásának egyszerűsége – tetszeni fog. Természetesen nagyon oda kell figyelni a projekt megszervezésére, magának az Airflow stabilitásának: csodák nem történnek.
Most az Airflow naponta dolgozik mintegy 6,5 ezer feladatot. Jellemükben egészen eltérőek. Vannak adatok betöltése a fő DWH-ba sok különböző és nagyon specifikus forrásból, vannak a fő DWH-n belüli kirakatszámítási feladatok, vannak adatok gyors DWH-ba való közzétételének feladatai, sok-sok különböző feladat - és az Airflow megrágja őket nap mint nap. Ha számokban beszélünk, ez így van 2,3 ezer Változó bonyolultságú ELT feladatok a DWH-n belül (Hadoop), kb. 2,5 száz adatbázis forrásokból származik, ez egy csapat 4 ETL fejlesztő, amelyek fel vannak osztva ETL adatfeldolgozásra a DWH-ban és ELT adatfeldolgozásra a DWH-n belül és természetesen többre egy adminisztrátor, aki a szolgáltatás infrastruktúrájával foglalkozik.
Tervek a jövőre
A folyamatok száma elkerülhetetlenül növekszik, és az Airflow infrastruktúráját illetően a legfontosabb, amit tenni fogunk, a méretezés. Létre akarunk építeni egy Airflow klasztert, kiosztunk egy pár lábat a Celery dolgozóinak, és egy önmásoló fejet szeretnénk készíteni munkaütemezési folyamatokkal és adattárral.
Epilógus
Ez persze nem minden, amit az Airflow-ról szeretnék elmondani, de a főbb pontokat igyekeztem kiemelni. Evés közben megjön az étvágy, próbáld ki és tetszeni fog :)
Forrás: will.com