Kaixo, Habr! Artikulu honetan batch datuak prozesatzeko prozesuak garatzeko tresna bikain bati buruz hitz egin nahi dut, adibidez, DWH korporatiboaren edo zure DataLake baten azpiegituran. Apache Airflow (aurrerantzean Airflow) buruz hitz egingo dugu. Habré-ri bidegabeko arreta kentzen zaio, eta zati nagusian saiatuko naiz konbentzitzen gutxienez Airflow-ek zure ETL/ELT prozesuetarako programatzaile bat aukeratzerakoan begiratzea merezi duela.
Aurretik, DWH gaiari buruzko artikulu sorta bat idatzi nuen Tinkoff Bank-en lan egiten nuenean. Orain Mail.Ru Taldeko taldean sartu naiz eta joko-eremuan datuak aztertzeko plataforma bat garatzen ari naiz. Egia esan, albisteak eta irtenbide interesgarriak agertzen diren heinean, nire taldeak eta biok hemen hitz egingo dugu datuen analisirako gure plataformari buruz.
Hitzaurrea
Beraz, has gaitezen. Zer da Airflow? Hau liburutegi bat da (edo
Orain ikus ditzagun Airflow-en entitate nagusiak. Haien funtsa eta helburua ulertuta, zure prozesu-arkitektura modu egokian antola dezakezu. Beharbada, entitate nagusia Grafiko Azikliko Zuzendua da (aurrerantzean DAG deitua).
DAG
DAG bat zure zereginen asoziazio esanguratsu bat da, zorrozki definitutako sekuentzia batean bete nahi dituzun ordutegi zehatz baten arabera. Airflow-ek web interfaze erosoa eskaintzen du DAGekin eta beste entitate batzuekin lan egiteko:
DAG-a honelakoa izan daiteke:
Garatzaileak, DAG bat diseinatzerakoan, DAG barruan eginkizunak eraikiko diren operadore multzo bat ezartzen du. Hemen beste entitate garrantzitsu batera iritsiko gara: Airflow Operator.
Operadore
Operadore bat lan-instantziaren oinarrian sortzen diren entitate bat da, lan-instantzia bat exekutatzen denean zer gertatuko den deskribatzen duena.
- BashOperator - bash komando bat exekutatzeko operadorea.
- PythonOperator - Python kodea deitzeko operadorea.
- EmailOperator — posta elektronikoa bidaltzeko operadorea.
- HTTPOperator - http eskaerekin lan egiteko operadorea.
- SqlOperator - SQL kodea exekutatzeko operadorea.
- Sentsore bat gertaera baten zain egoteko operadorea da (beharrezko ordua iristea, eskatutako fitxategia agertzea, datu-baseko lerro bat, APIaren erantzuna, etab., etab.).
Eragile zehatzagoak daude: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Zure ezaugarrietan oinarritutako operadoreak ere garatu ditzakezu eta zure proiektuan erabil ditzakezu. Adibidez, MongoDBToHiveViaHdfsTransfer sortu dugu, MongoDB-tik Hive-ra dokumentuak esportatzeko operadorea eta hainbat eragilerekin lan egiteko.
Ondoren, zereginen instantzia hauek guztiak exekutatu behar dira, eta orain programatzaileari buruz hitz egingo dugu.
Antolatzailea
Airflow-en zereginen antolatzailea eraikita dago
Igerileku bakoitzak zirrikitu kopuruaren muga du. DAG bat sortzean, multzo bat ematen zaio:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG mailan definitutako multzo bat zeregin mailan gainidatzi daiteke.
Prozesu bereizi bat, Scheduler, Airflow-en zeregin guztiak antolatzeaz arduratzen da. Egia esan, Scheduler-ek exekutatzeko zereginak ezartzeko mekanika guztiak lantzen ditu. Zereginak hainbat fase igarotzen ditu exekutatu aurretik:
- Aurreko zereginak DAGn burutu dira; berri bat ilaran jar daiteke.
- Ilara zereginen lehentasunaren arabera ordenatzen da (lehentasunak ere kontrola daitezke), eta igerilekuan zirrikitua libre badago, zeregina martxan jar daiteke.
- Doako langile apioa badago, hari bidaltzen zaio zeregina; arazoan programatu duzun lana hasten da, operadore bat edo beste erabiliz.
Nahikoa sinplea.
Scheduler DAG guztien eta DAGen barruan dauden zeregin guztietan exekutatzen da.
Scheduler DAG-ekin lanean hasteko, DAG-k ordutegi bat ezarri behar du:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Prest eginiko aurrezarpen multzo bat dago: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Cron adierazpenak ere erabil ditzakezu:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Gauzatze Data
Airflow nola funtzionatzen duen ulertzeko, garrantzitsua da DAG baten Exekuzio Data zer den ulertzea. Airflow-en, DAG-ek Exekuzio-data dimentsio bat du, hau da, DAG-en lan-egutegiaren arabera, zeregin-instantziak sortzen dira Exekuzio-data bakoitzerako. Eta Exekuzio Data bakoitzeko, zereginak berriro exekutatu daitezke edo, adibidez, DAG batek aldi berean lan egin dezake hainbat Exekuzio Datatan. Hau argi eta garbi erakusten da hemen:
Zoritxarrez (edo agian zorionez: egoeraren araberakoa da), DAGn zereginaren ezarpena zuzentzen bada, aurreko Exekuzio Datan exekuzioak aurrera egingo du doikuntzak kontuan hartuta. Hau ona da iraganeko aldietako datuak algoritmo berri bat erabiliz birkalkulatu behar badituzu, baina txarra da emaitzaren erreproduzigarritasuna galtzen delako (noski, inork ez zaitu molestatzen Git-etik iturburu-kodearen behar den bertsioa itzultzeko eta zer kalkulatzeko. denbora bat behar duzu, behar duzun moduan).
Zereginak sortzea
DAG-en inplementazioa kodea da Python-en, beraz, kode kopurua murrizteko modu oso erosoa dugu, adibidez, zatitutako iturriekin lan egitean. Demagun MySQL hiru zati dituzula iturri gisa, bakoitzean igo eta datu batzuk jaso behar dituzu. Gainera, modu independentean eta paraleloan. DAG-en Python kodea honelakoa izan daiteke:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG itxura hau du:
Kasu honetan, zati bat gehitu edo kendu dezakezu ezarpenak doituz eta DAG eguneratuz. Eroso!
Kode-sorkuntza konplexuagoa ere erabil dezakezu, adibidez, iturriekin datu-base moduan lan egin edo taula-egitura bat deskribatu, taula batekin lan egiteko algoritmo bat eta, DWH azpiegituraren ezaugarriak kontuan hartuta, prozesu bat sortzea. N taula zure biltegian kargatzeko. Edo, adibidez, zerrenda moduan parametro batekin lan egitea onartzen ez duen API batekin lan eginez, N zeregin sor ditzakezu DAG batean zerrenda horretatik, APIko eskaeren paralelismoa multzo batera mugatu eta scrape. APItik beharrezko datuak. Malgua!
biltegia
Airflow-ek bere backend biltegi propioa du, datu-base bat (MySQL edo Postgres izan daiteke, Postgres dugu), zereginen egoerak, DAGak, konexio ezarpenak, aldagai globalak eta abar gordetzen dituena. Hemen esan nahiko nuke Airflow-en biltegia oso erraza da (20 taula inguru) eta erosoa da haren gainean zure prozesuren bat eraiki nahi baduzu. Gogoan ditut Informatica biltegian dauden 100500 taulak, kontsulta bat nola eraiki ulertu aurretik denbora luzez aztertu behar izan zirenak.
jarraipenaren
Biltegiaren sinpletasuna kontuan hartuta, zuretzat komenigarria den zereginen jarraipena egiteko prozesu bat eraiki dezakezu. Zeppelin-en koaderno bat erabiltzen dugu, non zereginen egoera aztertzen dugun:
Hau ere izan daiteke Airflow beraren web interfazea:
Airflow kodea kode irekikoa da, beraz, alertak gehitu ditugu Telegram-en. Zeregin baten exekutatzen ari den instantzia bakoitzak, erroreren bat gertatzen bada, taldeari spam egiten dio Telegramen, non garapen eta laguntza talde osoa osatzen duen.
Telegramen bidez erantzun azkar bat jasotzen dugu (beharrezkoa bada), eta Zeppelinen bidez Airflow-eko zereginen irudi orokorra jasotzen dugu.
Guztira
Airflow kode irekia da nagusiki, eta ez zenuke hortik miraririk espero behar. Prest egon denbora eta ahalegina egiteko funtzionatzen duen irtenbide bat eraikitzeko. Helburua lorgarria da, sinets iezadazu, merezi du. Garapen-abiadura, malgutasuna, prozesu berriak gehitzeko erraztasuna - gustatuko zaizu. Jakina, arreta handia jarri behar diozu proiektuaren antolaketari, Airflow-aren egonkortasunari berari: mirariak ez dira gertatzen.
Orain Airflow egunero funtzionatzen dugu 6,5 mila zeregin inguru. Izaeraz nahiko desberdinak dira. Datuak DWH nagusira kargatzeko zereginak daude iturri ezberdin eta oso zehatzetatik, DWH nagusiaren barruan erakusleihoak kalkulatzeko zereginak daude, datuak DWH azkar batean argitaratzeko lanak daude, hainbat eta hainbat zeregin daude - eta Airflow denak murtxikatzen ditu egunez egun. Zenbakitan hitz eginda, hau da 2,3 mila DWH (Hadoop) barruan konplexutasun ezberdineko ELT zereginak, gutxi gorabehera. 2,5 ehun datu-base iturriak, hau talde bat da 4 ETL garatzaile, ETL datuen prozesamenduan DWHn eta ELT datuen prozesamenduan DWH barruan banatzen direnak eta, noski, gehiago administratzaile bat, zerbitzuaren azpiegituraz arduratzen dena.
Etorkizunerako planak
Prozesu kopurua hazten ari da ezinbestean, eta Airflow azpiegiturari dagokionez egingo dugun gauza nagusia eskalatzea da. Airflow kluster bat eraiki nahi dugu, Apioaren langileentzako hanka pare bat esleitu eta buru autobikoiztu bat egin nahi dugu lanak antolatzeko prozesuekin eta biltegi batekin.
Epilogoa
Hau, noski, ez da Airflow-ari buruz kontatu nahiko nukeen guztia, baina puntu nagusiak nabarmentzen saiatu naiz. Gosea jatearekin dator, probatu eta gustatuko zaizu :)
Iturria: www.habr.com