Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Hej Habr! En ĉi tiu artikolo, mi volas paroli pri unu bonega ilo por disvolvi procedojn pri bataj datumtraktadoj, ekzemple en la infrastrukturo de kompania DWH aŭ via DataLake. Ni parolos pri Apache Airflow (ĉi-poste nomata Airflow). Li estas maljuste senigita de atento pri Habré, kaj en la ĉefa parto mi provos konvinki vin, ke almenaŭ Airflow indas rigardi, kiam vi elektas planilon por viaj ETL / ELT-procezoj.

Antaŭe, mi skribis serion da artikoloj pri la temo de DWH kiam mi laboris ĉe Tinkoff Bank. Nun mi fariĝis parto de la teamo Mail.Ru Group kaj disvolvas platformon por analizo de datumoj en la videoludada areo. Efektive, ĉar novaĵoj kaj interesaj solvoj aperas, la teamo kaj mi parolos ĉi tie pri nia platformo por datuma analizo.

Antaŭparolo

Do, ni komencu. Kio estas Aerfluo? Ĉi tio estas biblioteko (aŭ aro de bibliotekoj) evoluigi, plani kaj kontroli laborfluojn. La ĉefa trajto de Airflow estas, ke Python-kodo estas uzata por priskribi (evoluigi) procezojn. Ĉi tio havas multajn avantaĝojn por organizi vian projekton kaj disvolviĝon: fakte via (ekzemple) ETL-projekto estas nur Python-projekto, kaj vi povas organizi ĝin laŭplaĉe, konsiderante infrastrukturajn funkciojn, teamgrandecon kaj aliajn postulojn. Instrumente ĉio estas simpla. Uzu ekzemple PyCharm + Git. Ĝi estas bonega kaj tre oportuna!

Nun ni rigardu la ĉefajn estaĵojn de Airflow. Kompreninte ilian esencon kaj celon, vi optimume organizos la procezan arkitekturon. Eble la ĉefa ento estas la Direktita Acikla Grafiko (ĉi-poste DAG).

DAG

DAG estas iu semantika asocio de viaj taskoj, kiujn vi volas plenumi en strikte difinita sinsekvo laŭ specifa horaro. Aerfluo prezentas oportunan retan interfacon por labori kun DAG-oj kaj aliaj estaĵoj:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

DAG povus aspekti jene:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Dum dizajnado de DAG, programisto metas aron da funkciigistoj sur kiuj taskoj ene de la DAG estos konstruitaj. Jen ni venas al alia grava ento: la Aerflua Operaciisto.

Telefonistoj

Operatoro estas ento surbaze de kiuj estas kreitaj laborinstancoj, kiu priskribas kio okazos dum la ekzekuto de laborkazo. Aerfluo liberiĝas de GitHub jam enhavas aron da deklaroj pretaj por esti uzataj. Ekzemploj:

  • BashOperator estas funkciigisto por efektivigi bash-komandon.
  • PythonOperator estas funkciigisto por voki Python-kodon.
  • EmailOperator - funkciigisto por sendi retpoŝton.
  • HTTPOperator - operatoro por labori kun http-petoj.
  • SqlOperator estas funkciigisto por ekzekuti SQL-kodon.
  • Sensilo estas funkciigisto por atendi eventon (la alveno de la dezirata tempo, la apero de la bezonata dosiero, vico en la datumbazo, respondo de la API, ktp., ktp.).

Estas pli specifaj funkciigistoj: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Vi ankaŭ povas evoluigi funkciigistojn laŭ viaj bezonoj kaj uzi ilin en via projekto. Ekzemple, ni kreis MongoDBToHiveViaHdfsTransfer, funkciigiston por eksporti dokumentojn de MongoDB al Hive, kaj plurajn funkciigistojn por labori kun KlakuDomo: CHLoadFromHiveOperator kaj CHTableLoaderOperator. Fakte, tuj kiam projekto ofte uzis kodon konstruitan sur bazaj deklaroj, vi povas pensi pri kompili ĝin en novan deklaron. Ĉi tio simpligos pluan disvolviĝon, kaj vi aldonos al via biblioteko de funkciigistoj en la projekto.

Plue, ĉiuj ĉi tiuj okazoj de taskoj devas esti plenumitaj, kaj nun ni parolos pri la planilo.

Planilo

La taskoplanilo en Airflow estas konstruita sur Celerio. Celery estas Python-biblioteko, kiu ebligas al vi organizi vicon kaj nesinkronan kaj distribuitan plenumon de taskoj. De la flanko de Airflow, ĉiuj taskoj estas dividitaj en naĝejojn. Naĝejoj estas kreitaj permane. Kiel regulo, ilia celo estas limigi la ŝarĝon labori kun la fonto aŭ tajpi taskojn ene de la DWH. Naĝejoj povas esti administritaj per la retinterfaco:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Ĉiu naĝejo havas limon por la nombro da fendoj. Dum kreado de DAG, ĝi ricevas naĝejon:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

La naĝejo metita ĉe la DAG-nivelo povas esti forigita ĉe la taskonivelo.
Aparta procezo, Scheduler, respondecas pri planado de ĉiuj taskoj en Airflow. Fakte, la Scheduler traktas ĉiujn mekanikojn por agordi taskojn por ekzekuto. Tasko pasas tra pluraj stadioj antaŭ esti efektivigita:

  1. Antaŭaj taskoj estis kompletigitaj en la DAG, nova povas esti vicigita.
  2. La vico estas ordigita laŭ la prioritato de taskoj (prioritatoj ankaŭ povas esti kontrolitaj), kaj se ekzistas libera fendo en la naĝejo, la tasko povas esti prenita por funkcii.
  3. Se estas libera laborista celerio, la tasko estas sendita al ĝi; komenciĝas la laboro, kiun vi programis en la tasko, uzante unu aŭ alian funkciigiston.

Sufiĉe simpla.

La Planilo funkcias per aro de ĉiuj DAG-oj kaj ĉiuj taskoj ene de DAG-oj.

Por ke la Planilo komencu labori kun la DAG, la DAG devas agordi horaron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Estas aro de pretaj antaŭaj agordoj: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Vi ankaŭ povas uzi cron-esprimojn:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dato de Ekzekuto

Por kompreni kiel funkcias Aerfluo, gravas kompreni, kio estas Ekzekuta Dato por DAG. La Airflow DAG havas la Execution Date dimension, t.e., depende de la laborhoraro de la DAG, taskokazoj estas kreitaj por ĉiu Execution Date. Kaj por ĉiu Ekzekuta Dato, taskoj povas esti reekzekutaj - aŭ, ekzemple, DAG povas funkcii samtempe en pluraj Ekzekutaj Datoj. Ĉi tio estas klare montrita ĉi tie:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Bedaŭrinde (aŭ eble feliĉe: dependas de la situacio), se la efektivigo de la tasko en la DAG estas ĝusta, tiam la ekzekuto en la antaŭa Ekzekuta Dato iros kun la ĝustigoj. Ĉi tio estas bona se vi bezonas rekalkuli datumojn en pasintaj periodoj uzante novan algoritmon, sed ĝi estas malbona ĉar la reproduktebleco de la rezulto estas perdita (kompreneble, neniu ĝenas redoni la bezonatan version de la fontkodo de Git kaj kalkuli kion vi bezonas unufoje, laŭbezone).

Taskogenerado

La DAG-efektivigo estas Python-kodo, do ni havas tre oportunan manieron redukti la kvanton da kodo kiam oni laboras, ekzemple, kun sharded fontoj. Supozu, ke vi havas tri MySQL-pecetojn kiel fonton, vi devas grimpi en ĉiun kaj preni iujn datumojn. Kaj sendepende kaj paralele. La Python-kodo en la DAG povus aspekti jene:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

La DAG aspektas jene:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Samtempe, vi povas aldoni aŭ forigi peceton simple ĝustigante la agordon kaj ĝisdatigante la DAG. Komforta!

Vi ankaŭ povas uzi pli kompleksan kodon, ekzemple, labori kun fontoj en formo de datumbazo aŭ priskribi tabelan strukturon, algoritmon por labori kun tabelo, kaj, konsiderante la trajtojn de la DWH-infrastrukturo, generi la procezon. ŝarĝi N-tabelojn en vian stokadon. Aŭ, ekzemple, laborante kun API, kiu ne subtenas labori kun parametro en formo de listo, vi povas generi N taskojn en la DAG uzante ĉi tiun liston, limigi la paralelecon de petoj en la API al naĝejo kaj ĉerpi. la necesajn datumojn de la API. Fleksebla!

deponejo

Airflow havas sian propran backend-deponejon, datumbazon (eble MySQL aŭ Postgres, ni havas Postgres), kiu konservas la statojn de taskoj, DAG-oj, konekto-agordojn, tutmondajn variablojn ktp., ktp. Ĉi tie mi ŝatus diri, ke la deponejo. en Airflow estas tre simpla (ĉirkaŭ 20 tabloj) kaj oportuna se vi volas konstrui iun el viaj procezoj sur ĝi. Mi memoras 100500 XNUMX tabelojn en la deponejo de Informatica, kiujn oni devis longe fumi antaŭ ol kompreni kiel konstrui demandon.

Monitorado

Konsiderante la simplecon de la deponejo, vi povas konstrui procezon por monitorado de taskoj, kiu taŭgas por vi. Ni uzas notblokon en Zeppelin, kie ni rigardas la staton de taskoj:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

Ĝi ankaŭ povas esti la retinterfaco de Airflow mem:

Aerfluo estas ilo por komforte kaj rapide disvolvi kaj konservi procesojn de bataj datumtraktadoj

La Airflow-kodo estas malfermita, do ni aldonis alarmon en Telegramo. Ĉiu funkcianta tasko, se okazas eraro, spamas al la Telegram-grupo, kie konsistas la tuta evolua kaj subtena teamo.

Ni ricevas rapidan respondon per Telegramo (se necesas), per Zeppelin - ĝenerala bildo de la taskoj en Airflow.

Tuta

Aerfluo estas unue kaj ĉefe malfermfonta, kaj ne atendu miraklojn de ĝi. Estu preta enmeti la tempon kaj penon por konstrui funkciantan solvon. Celo el la kategorio de atingebla, kredu min, indas. Disvolva rapideco, fleksebleco, facileco aldoni novajn procezojn - vi amos ĝin. Kompreneble, vi devas multe atenti la organizon de la projekto, la stabilecon de la laboro de Airflow mem: ne ekzistas mirakloj.

Nun ni ĉiutage funkcias Airflow ĉirkaŭ 6,5 mil taskoj. Ili estas sufiĉe malsamaj en naturo. Estas taskoj por ŝargi datumojn en la ĉefan DWH el multaj malsamaj kaj tre specifaj fontoj, estas taskoj por kalkuli butikfasadojn ene de la ĉefa DWH, estas taskoj por publikigi datumojn en rapidan DWH, estas multaj, multaj malsamaj taskoj - kaj Airflow. maĉas ilin la tutan tagon post tago. Parolante en nombroj, jen 2,3 mil ELT-taskoj de diversa komplekseco ene de DWH (Hadoop), pri 2,5 cent datumbazoj fontoj, ĉi tio estas komando de 4 ETL-programistoj, kiuj estas dividitaj en ETL-datumtraktadon en DWH kaj ELT-datumtraktado ene de DWH kaj kompreneble pli unu administranto, kiu traktas la infrastrukturon de la servo.

Planoj por la estonteco

La nombro da procezoj neeviteble kreskas, kaj la ĉefa afero, kiun ni faros laŭ la infrastrukturo de Airflow, estas grimpi. Ni volas konstrui Aerfluan areton, asigni kelkajn krurojn por Celery-laboristoj kaj fari duoblan kapon kun laborplanaj procezoj kaj deponejo.

Epilogo

Ĉi tio, kompreneble, estas malproksima de ĉio, kion mi ŝatus paroli pri Airflow, sed mi provis reliefigi la ĉefajn punktojn. Apetito venas kun manĝado, provu ĝin kaj vi ŝatos ĝin 🙂

fonto: www.habr.com

Aldoni komenton