Moien, Habr! An dësem Artikel wëll ech iwwer ee super Tool schwätzen fir Batchdatenveraarbechtungsprozesser z'entwéckelen, zum Beispill an der Infrastruktur vun enger Firmen-DWH oder Ärem DataLake. Mir schwätzen iwwer Apache Airflow (nodréiglech genannt Airflow). Et ass ongerecht vun der Opmierksamkeet op Habré entzunn, an am Haaptdeel wäert ech probéieren Iech ze iwwerzeegen datt op d'mannst Airflow et wäert ass ze kucken wann Dir e Scheduler fir Är ETL / ELT Prozesser auswielt.
Virdrun hunn ech eng Serie vun Artikelen iwwert d'Thema vun DWH geschriwwen wann ech bei Tinkoff Bank geschafft. Elo sinn ech en Deel vum Mail.Ru Group Team ginn an entwéckelen eng Plattform fir Datenanalyse am Spillberäich. Eigentlech, wéi Neiegkeeten an interessant Léisunge erschéngen, wäerte mäi Team an ech hei iwwer eis Plattform fir Datenanalyse schwätzen.
Prolog
Also, loosst eis ufänken. Wat ass Airflow? Dëst ass eng Bibliothéik (oder
Loosst eis elo d'Haaptentitéite vum Airflow kucken. Andeems Dir hir Essenz an Zweck versteet, kënnt Dir Är Prozessarchitektur optimal organiséieren. Vläicht ass d'Haaptentitéit de Directed Acyclic Graph (nachfolgend DAG bezeechent).
TAG
En DAG ass eng sënnvoll Associatioun vun Ären Aufgaben, déi Dir wëllt an enger strikt definéierter Sequenz no engem spezifesche Zäitplang ausfëllen. Airflow bitt eng praktesch Webinterface fir mat DAGs an aner Entitéiten ze schaffen:
Den DAG kéint esou ausgesinn:
Den Entwéckler, wann en DAG designt, leet eng Rei vun Bedreiwer fest, op deenen Aufgaben am DAG gebaut ginn. Hei komme mer zu enger anerer wichteger Entitéit: Airflow Operator.
Betreiber
En Bedreiwer ass eng Entitéit op Basis vun deenen Jobinstanzen erstallt ginn, déi beschreift wat wärend der Ausféierung vun enger Jobinstanz geschitt.
- BashOperator - Bedreiwer fir e Bash Kommando auszeféieren.
- PythonOperator - Bedreiwer fir Python Code ze ruffen.
- EmailOperator - Bedreiwer fir E-Mail ze schécken.
- HTTPOperator - Bedreiwer fir mat http Ufroen ze schaffen.
- SqlOperator - Bedreiwer fir SQL Code auszeféieren.
- Sensor ass en Bedreiwer fir op en Event ze waarden (d'Arrivée vun der erfuerderter Zäit, d'Erscheinung vun der erfuerderter Datei, eng Zeil an der Datebank, eng Äntwert vun der API, etc., etc.).
Et gi méi spezifesch Betreiber: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Dir kënnt och Betreiber entwéckelen op Basis vun Ären eegene Charakteristiken a benotzen se an Ärem Projet. Zum Beispill hu mir MongoDBToHiveViaHdfsTransfer erstallt, e Bedreiwer fir Dokumenter vu MongoDB op Hive ze exportéieren, a verschidde Bedreiwer fir mat ze schaffen
Als nächst musse all dës Instanzen vun Aufgaben ausgefouert ginn, an elo wäerte mir iwwer de Scheduler schwätzen.
Scheduler
Dem Airflow säin Task Scheduler ass op gebaut
All Pool huet eng Limite op d'Zuel vun Plaze . Wann Dir en DAG erstellt, kritt et e Pool:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
E Pool definéiert um DAG Niveau kann um Taskniveau iwwerschratt ginn.
En separaten Prozess, Scheduler, ass verantwortlech fir all Aufgaben am Airflow ze plangen. Eigentlech beschäftegt de Scheduler all d'Mechanik fir Aufgaben fir d'Ausféierung ze setzen. D'Aufgab geet duerch e puer Etappen ier se ausgefouert gëtt:
- Déi viregt Aufgaben sinn am DAG ofgeschloss ginn, eng nei kann an der Schlaang stoen.
- D'Schlaang ass jee no der Prioritéit vun Aufgaben zortéiert (Prioritéiten kënnen och kontrolléiert ginn), a wann et e gratis Slot am Pool ass, kann d'Aufgab a Betrib geholl ginn.
- Wann et e gratis Aarbechter Sellerie ass, gëtt d'Aufgab un et geschéckt; d'Aarbecht, déi Dir am Problem programméiert, fänkt un, mat engem oder aneren Bedreiwer.
Einfach genuch.
Scheduler leeft op de Set vun all DAGs an all Aufgaben bannent DAGs.
Fir de Scheduler mat DAG ze schaffen, muss den DAG e Zäitplang setzen:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Et gëtt e Set vu fäerdege Presets: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Dir kënnt och Cron Ausdréck benotzen:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Ausféierung Datum
Fir ze verstoen wéi Airflow funktionnéiert, ass et wichteg ze verstoen wat Ausféierungsdatum fir en DAG ass. Am Airflow huet DAG eng Ausféierungsdatum Dimensioun, dat heescht, ofhängeg vum DAG säin Aarbechtsplang, ginn Task Instanzen fir all Ausféierungsdatum erstallt. A fir all Ausféierungsdatum kënnen Aufgaben nei ausgefouert ginn - oder, zum Beispill, en DAG kann gläichzäiteg a verschiddenen Ausféierungsdatum schaffen. Dëst ass kloer hei gewisen:
Leider (oder vläicht glécklecherweis: et hänkt vun der Situatioun of), wann d'Ëmsetzung vun der Aufgab am DAG korrigéiert ass, da geet d'Ausféierung am viregten Ausféierungsdatum weider, andeems d'Upassunge berücksichtegt ginn. Dëst ass gutt wann Dir Daten an de vergaangene Perioden mat engem neien Algorithmus nei berechnen musst, awer et ass schlecht well d'Reproduzibilitéit vum Resultat verluer ass (natierlech stéiert keen Iech fir déi erfuerderlech Versioun vum Quellcode vu Git zréckzeginn a berechent wat Dir braucht eng Kéier, wéi Dir et braucht).
Aufgaben ze generéieren
D'Ëmsetzung vun der DAG ass Code am Python, also hu mir e ganz praktesche Wee fir d'Quantitéit vum Code ze reduzéieren wann Dir schafft, zum Beispill mat sharded Quellen. Loosst eis soen datt Dir dräi MySQL-Shards als Quell hutt, Dir musst an all eenzel klammen an e puer Daten ophuelen. Ausserdeem, onofhängeg a parallel. De Python Code am DAG kéint esou ausgesinn:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
Den DAG gesäit esou aus:
An dësem Fall kënnt Dir e Shard addéieren oder ewechhuelen andeems Dir einfach d'Astellungen upasst an den DAG aktualiséieren. Gemittlech!
Dir kënnt och méi komplex Code Generatioun benotzen, zum Beispill, mat Quellen an der Form vun enger Datebank ze schaffen oder eng Tabellstruktur beschreiwen, en Algorithmus fir mat enger Tabell ze schaffen, an, andeems d'Features vun der DWH Infrastruktur berücksichtegt ginn, e Prozess generéieren fir eng Luede N Dëscher an Ärer Stockage. Oder, zum Beispill, mat enger API ze schaffen, déi net mat engem Parameter an der Form vun enger Lëscht schaffen, kënnt Dir N Aufgaben an engem DAG aus dëser Lëscht generéieren, d'Parallelismus vun Ufroen an der API op e Pool limitéieren an d'Schrauwen néideg Daten aus der API. Flexibel!
repository
Airflow huet säin eegene Backend Repository, eng Datebank (kann MySQL oder Postgres sinn, mir hunn Postgres), déi d'Staaten vun Aufgaben, DAGs, Verbindungsastellungen, global Variablen, asw., etc. Hei wéilt ech soen datt de Repository am Airflow ass ganz einfach (ongeféier 20 Dëscher) a praktesch wann Dir iergendeng vun Ären eegene Prozesser uewen opbaue wëllt. Ech erënnere mech un déi 100500 Dëscher am Informatica-Repository, déi fir eng laang Zäit studéiert musse ginn ier se verstanen hunn, wéi een eng Ufro bauen.
Iwwerwaachung
Mat der Einfachheet vum Repository kënnt Dir en Task-Iwwerwaachungsprozess bauen dee fir Iech bequem ass. Mir benotzen en Notizblock am Zeppelin, wou mir de Status vun den Aufgaben kucken:
Dëst kéint och d'Webinterface vum Airflow selwer sinn:
Den Airflow Code ass Open Source, also hu mir Alarm op Telegram bäigefüügt. All Lafen Instanz vun enger Aufgab, wann e Feeler geschitt, spams de Grupp am Telegram, wou d'ganz Entwécklung an Ënnerstëtzung Equipe besteet.
Mir kréien eng prompt Äntwert duerch Telegram (wann néideg), an duerch Zeppelin kréien mir e Gesamtbild vun Aufgaben am Airflow.
Total
Airflow ass haaptsächlech Open Source, an Dir sollt keng Wonner dovun erwaarden. Bereet Iech Zäit an Effort ze setzen fir eng Léisung ze bauen déi funktionnéiert. D'Zil ass erreechbar, gleeft mir, et ass derwäert. Geschwindegkeet vun der Entwécklung, Flexibilitéit, Liichtegkeet fir nei Prozesser ze addéieren - Dir wäert et gär hunn. Natierlech musst Dir vill Opmierksamkeet op d'Organisatioun vum Projet bezuelen, d'Stabilitéit vum Airflow selwer: Wonner geschéien net.
Elo hu mir Airflow all Dag schaffen iwwer 6,5 dausend Aufgaben. Si sinn ganz ënnerschiddlech am Charakter. Et ginn Aufgaben fir Daten an den Haapt-DWH aus ville verschiddene a ganz spezifesche Quellen ze lueden, et ginn Aufgabe fir Storefronts am Haapt-DWH ze berechnen, et ginn Aufgaben fir Daten an e séieren DWH ze publizéieren, et gi vill, vill verschidden Aufgaben - an Airflow kauen se all Dag fir Dag. An Zuelen schwätzen, ass dëst 2,3 Dausend ELT Aufgaben vu variéierter Komplexitéit bannent DWH (Hadoop), ca. 2,5 honnert Datenbanken Quellen, dëst ass eng Equipe aus 4 ETL Entwéckler, déi ënnerdeelt sinn an ETL Datenveraarbechtung an DWH an ELT Datenveraarbechtung bannent DWH an natierlech méi een Admin, déi sech mat der Infrastruktur vum Service beschäftegt.
Pläng fir d'Zukunft
D'Zuel vun de Prozesser wiisst zwangsleefeg, an d'Haapt Saach mir wäerte maachen wat d'Airflow Infrastruktur ugeet ass Skaléieren. Mir wëllen en Airflow-Cluster bauen, e Paar Been fir Sellerieaarbechter verdeelen, an e selbstduplizéierte Kapp mat Aarbechtsplangprozesser an engem Repository maachen.
Epilogue
Dëst ass natierlech net alles wat ech iwwer Airflow erziele géif, awer ech hu probéiert d'Haaptpunkte ze markéieren. Appetit kënnt mam Iessen, probéiert et an Dir wäert et gär hunn :)
Source: will.com