Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Kaixo, Habr! Artikulu honetan batch datuak prozesatzeko prozesuak garatzeko tresna bikain bati buruz hitz egin nahi dut, adibidez, DWH korporatiboaren edo zure DataLake baten azpiegituran. Apache Airflow (aurrerantzean Airflow) buruz hitz egingo dugu. Habré-ri bidegabeko arreta kentzen zaio, eta zati nagusian saiatuko naiz konbentzitzen gutxienez Airflow-ek zure ETL/ELT prozesuetarako programatzaile bat aukeratzerakoan begiratzea merezi duela.

Aurretik, DWH gaiari buruzko artikulu sorta bat idatzi nuen Tinkoff Bank-en lan egiten nuenean. Orain Mail.Ru Taldeko taldean sartu naiz eta joko-eremuan datuak aztertzeko plataforma bat garatzen ari naiz. Egia esan, albisteak eta irtenbide interesgarriak agertzen diren heinean, nire taldeak eta biok hemen hitz egingo dugu datuen analisirako gure plataformari buruz.

Hitzaurrea

Beraz, has gaitezen. Zer da Airflow? Hau liburutegi bat da (edo liburutegi multzoa) lan-prozesuak garatzea, planifikatzea eta kontrolatzea. Airflow-en ezaugarri nagusia: Python kodea prozesuak deskribatzeko (garatzeko) erabiltzen da. Honek abantaila asko ditu zure proiektua antolatzeko eta garapenerako: funtsean, zure (adibidez) ETL proiektua Python proiektu bat besterik ez da, eta nahi duzun moduan antola dezakezu, azpiegituraren berezitasunak, taldearen tamaina eta beste eskakizun batzuk. Instrumentalki dena sinplea da. Erabili adibidez PyCharm + Git. Zoragarria eta oso erosoa da!

Orain ikus ditzagun Airflow-en entitate nagusiak. Haien funtsa eta helburua ulertuta, zure prozesu-arkitektura modu egokian antola dezakezu. Beharbada, entitate nagusia Grafiko Azikliko Zuzendua da (aurrerantzean DAG deitua).

DAG

DAG bat zure zereginen asoziazio esanguratsu bat da, zorrozki definitutako sekuentzia batean bete nahi dituzun ordutegi zehatz baten arabera. Airflow-ek web interfaze erosoa eskaintzen du DAGekin eta beste entitate batzuekin lan egiteko:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

DAG-a honelakoa izan daiteke:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Garatzaileak, DAG bat diseinatzerakoan, DAG barruan eginkizunak eraikiko diren operadore multzo bat ezartzen du. Hemen beste entitate garrantzitsu batera iritsiko gara: Airflow Operator.

Operadore

Operadore bat lan-instantziaren oinarrian sortzen diren entitate bat da, lan-instantzia bat exekutatzen denean zer gertatuko den deskribatzen duena. GitHub-etik aire-fluxua atera da dagoeneko erabiltzeko prest dauden operadore multzo bat daukate. Adibideak:

  • BashOperator - bash komando bat exekutatzeko operadorea.
  • PythonOperator - Python kodea deitzeko operadorea.
  • EmailOperator — posta elektronikoa bidaltzeko operadorea.
  • HTTPOperator - http eskaerekin lan egiteko operadorea.
  • SqlOperator - SQL kodea exekutatzeko operadorea.
  • Sentsore bat gertaera baten zain egoteko operadorea da (beharrezko ordua iristea, eskatutako fitxategia agertzea, datu-baseko lerro bat, APIaren erantzuna, etab., etab.).

Eragile zehatzagoak daude: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Zure ezaugarrietan oinarritutako operadoreak ere garatu ditzakezu eta zure proiektuan erabil ditzakezu. Adibidez, MongoDBToHiveViaHdfsTransfer sortu dugu, MongoDB-tik Hive-ra dokumentuak esportatzeko operadorea eta hainbat eragilerekin lan egiteko. clickhouse: CHLoadFromHiveOperator eta CHTableLoaderOperator. Funtsean, proiektu batek oinarrizko adierazpenetan eraikitako kodea maiz erabili bezain laster, adierazpen berri batean eraikitzea pentsa dezakezu. Honek garapen gehiago erraztuko du, eta zure operadoreen liburutegia zabalduko duzu proiektuan.

Ondoren, zereginen instantzia hauek guztiak exekutatu behar dira, eta orain programatzaileari buruz hitz egingo dugu.

Antolatzailea

Airflow-en zereginen antolatzailea eraikita dago Apioa. Celery Python liburutegi bat da, eta ilara bat eta zereginen exekuzio asinkrono eta banatua antolatzeko aukera ematen du. Airflow aldean, zeregin guztiak igerilekuetan banatzen dira. Igerilekuak eskuz sortzen dira. Normalean, haien helburua iturriarekin lan egitearen lan-karga mugatzea edo DWHren barruan zereginak tipifikatzea da. Igerilekuak web interfazearen bidez kudeatu daitezke:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Igerileku bakoitzak zirrikitu kopuruaren muga du. DAG bat sortzean, multzo bat ematen zaio:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

DAG mailan definitutako multzo bat zeregin mailan gainidatzi daiteke.
Prozesu bereizi bat, Scheduler, Airflow-en zeregin guztiak antolatzeaz arduratzen da. Egia esan, Scheduler-ek exekutatzeko zereginak ezartzeko mekanika guztiak lantzen ditu. Zereginak hainbat fase igarotzen ditu exekutatu aurretik:

  1. Aurreko zereginak DAGn burutu dira; berri bat ilaran jar daiteke.
  2. Ilara zereginen lehentasunaren arabera ordenatzen da (lehentasunak ere kontrola daitezke), eta igerilekuan zirrikitua libre badago, zeregina martxan jar daiteke.
  3. Doako langile apioa badago, hari bidaltzen zaio zeregina; arazoan programatu duzun lana hasten da, operadore bat edo beste erabiliz.

Nahikoa sinplea.

Scheduler DAG guztien eta DAGen barruan dauden zeregin guztietan exekutatzen da.

Scheduler DAG-ekin lanean hasteko, DAG-k ordutegi bat ezarri behar du:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Prest eginiko aurrezarpen multzo bat dago: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Cron adierazpenak ere erabil ditzakezu:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Gauzatze Data

Airflow nola funtzionatzen duen ulertzeko, garrantzitsua da DAG baten Exekuzio Data zer den ulertzea. Airflow-en, DAG-ek Exekuzio-data dimentsio bat du, hau da, DAG-en lan-egutegiaren arabera, zeregin-instantziak sortzen dira Exekuzio-data bakoitzerako. Eta Exekuzio Data bakoitzeko, zereginak berriro exekutatu daitezke edo, adibidez, DAG batek aldi berean lan egin dezake hainbat Exekuzio Datatan. Hau argi eta garbi erakusten da hemen:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Zoritxarrez (edo agian zorionez: egoeraren araberakoa da), DAGn zereginaren ezarpena zuzentzen bada, aurreko Exekuzio Datan exekuzioak aurrera egingo du doikuntzak kontuan hartuta. Hau ona da iraganeko aldietako datuak algoritmo berri bat erabiliz birkalkulatu behar badituzu, baina txarra da emaitzaren erreproduzigarritasuna galtzen delako (noski, inork ez zaitu molestatzen Git-etik iturburu-kodearen behar den bertsioa itzultzeko eta zer kalkulatzeko. denbora bat behar duzu, behar duzun moduan).

Zereginak sortzea

DAG-en inplementazioa kodea da Python-en, beraz, kode kopurua murrizteko modu oso erosoa dugu, adibidez, zatitutako iturriekin lan egitean. Demagun MySQL hiru zati dituzula iturri gisa, bakoitzean igo eta datu batzuk jaso behar dituzu. Gainera, modu independentean eta paraleloan. DAG-en Python kodea honelakoa izan daiteke:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG itxura hau du:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Kasu honetan, zati bat gehitu edo kendu dezakezu ezarpenak doituz eta DAG eguneratuz. Eroso!

Kode-sorkuntza konplexuagoa ere erabil dezakezu, adibidez, iturriekin datu-base moduan lan egin edo taula-egitura bat deskribatu, taula batekin lan egiteko algoritmo bat eta, DWH azpiegituraren ezaugarriak kontuan hartuta, prozesu bat sortzea. N taula zure biltegian kargatzeko. Edo, adibidez, zerrenda moduan parametro batekin lan egitea onartzen ez duen API batekin lan eginez, N zeregin sor ditzakezu DAG batean zerrenda horretatik, APIko eskaeren paralelismoa multzo batera mugatu eta scrape. APItik beharrezko datuak. Malgua!

biltegia

Airflow-ek bere backend biltegi propioa du, datu-base bat (MySQL edo Postgres izan daiteke, Postgres dugu), zereginen egoerak, DAGak, konexio ezarpenak, aldagai globalak eta abar gordetzen dituena. Hemen esan nahiko nuke Airflow-en biltegia oso erraza da (20 taula inguru) eta erosoa da haren gainean zure prozesuren bat eraiki nahi baduzu. Gogoan ditut Informatica biltegian dauden 100500 taulak, kontsulta bat nola eraiki ulertu aurretik denbora luzez aztertu behar izan zirenak.

jarraipenaren

Biltegiaren sinpletasuna kontuan hartuta, zuretzat komenigarria den zereginen jarraipena egiteko prozesu bat eraiki dezakezu. Zeppelin-en koaderno bat erabiltzen dugu, non zereginen egoera aztertzen dugun:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Hau ere izan daiteke Airflow beraren web interfazea:

Airflow batch datuak prozesatzeko prozesuak eroso eta azkar garatzeko eta mantentzeko tresna da

Airflow kodea kode irekikoa da, beraz, alertak gehitu ditugu Telegram-en. Zeregin baten exekutatzen ari den instantzia bakoitzak, erroreren bat gertatzen bada, taldeari spam egiten dio Telegramen, non garapen eta laguntza talde osoa osatzen duen.

Telegramen bidez erantzun azkar bat jasotzen dugu (beharrezkoa bada), eta Zeppelinen bidez Airflow-eko zereginen irudi orokorra jasotzen dugu.

Guztira

Airflow kode irekia da nagusiki, eta ez zenuke hortik miraririk espero behar. Prest egon denbora eta ahalegina egiteko funtzionatzen duen irtenbide bat eraikitzeko. Helburua lorgarria da, sinets iezadazu, merezi du. Garapen-abiadura, malgutasuna, prozesu berriak gehitzeko erraztasuna - gustatuko zaizu. Jakina, arreta handia jarri behar diozu proiektuaren antolaketari, Airflow-aren egonkortasunari berari: mirariak ez dira gertatzen.

Orain Airflow egunero funtzionatzen dugu 6,5 mila zeregin inguru. Izaeraz nahiko desberdinak dira. Datuak DWH nagusira kargatzeko zereginak daude iturri ezberdin eta oso zehatzetatik, DWH nagusiaren barruan erakusleihoak kalkulatzeko zereginak daude, datuak DWH azkar batean argitaratzeko lanak daude, hainbat eta hainbat zeregin daude - eta Airflow denak murtxikatzen ditu egunez egun. Zenbakitan hitz eginda, hau da 2,3 mila DWH (Hadoop) barruan konplexutasun ezberdineko ELT zereginak, gutxi gorabehera. 2,5 ehun datu-base iturriak, hau talde bat da 4 ETL garatzaile, ETL datuen prozesamenduan DWHn eta ELT datuen prozesamenduan DWH barruan banatzen direnak eta, noski, gehiago administratzaile bat, zerbitzuaren azpiegituraz arduratzen dena.

Etorkizunerako planak

Prozesu kopurua hazten ari da ezinbestean, eta Airflow azpiegiturari dagokionez egingo dugun gauza nagusia eskalatzea da. Airflow kluster bat eraiki nahi dugu, Apioaren langileentzako hanka pare bat esleitu eta buru autobikoiztu bat egin nahi dugu lanak antolatzeko prozesuekin eta biltegi batekin.

Epilogoa

Hau, noski, ez da Airflow-ari buruz kontatu nahiko nukeen guztia, baina puntu nagusiak nabarmentzen saiatu naiz. Gosea jatearekin dator, probatu eta gustatuko zaizu :)

Iturria: www.habr.com

Gehitu iruzkin berria