Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Hei Habr! Yn yr erthygl hon, rwyf am siarad am un offeryn gwych ar gyfer datblygu prosesau prosesu data swp, er enghraifft, yn seilwaith DWH corfforaethol neu eich DataLake. Byddwn yn siarad am Apache Airflow (y cyfeirir ati yma wedi hyn fel Airflow). Mae'n cael ei amddifadu'n annheg o sylw ar Habré, ac yn y prif ran byddaf yn ceisio eich argyhoeddi bod o leiaf Airflow yn werth edrych arno wrth ddewis trefnydd ar gyfer eich prosesau ETL / ELT.

Yn flaenorol, ysgrifennais gyfres o erthyglau ar y pwnc DWH pan oeddwn yn gweithio yn Tinkoff Bank. Nawr rwyf wedi dod yn rhan o dîm Mail.Ru Group ac yn datblygu llwyfan ar gyfer dadansoddi data yn yr ardal hapchwarae. A dweud y gwir, wrth i newyddion ac atebion diddorol ymddangos, bydd y tîm a minnau'n siarad yma am ein platfform ar gyfer dadansoddi data.

Prologue

Felly, gadewch i ni ddechrau. Beth yw Llif Awyr? Llyfrgell yw hon (neu set o lyfrgelloedd) datblygu, cynllunio a monitro llifoedd gwaith. Prif nodwedd Airflow yw bod cod Python yn cael ei ddefnyddio i ddisgrifio (datblygu) prosesau. Mae gan hyn lawer o fanteision ar gyfer trefnu eich prosiect a'ch datblygiad: mewn gwirionedd, dim ond prosiect Python yw eich prosiect ETL (er enghraifft), a gallwch ei drefnu fel y dymunwch, gan ystyried nodweddion seilwaith, maint tîm, a gofynion eraill . Yn offerynnol, mae popeth yn syml. Defnyddiwch er enghraifft PyCharm + Git. Mae'n wych ac yn gyfleus iawn!

Nawr, gadewch i ni edrych ar brif endidau Airflow. Ar ôl deall eu hanfod a'u pwrpas, byddwch yn trefnu pensaernïaeth y broses yn y ffordd orau bosibl. Efallai mai'r prif endid yw'r Graff Acyclic Cyfeiriedig (DAG o hyn ymlaen).

DAG

Mae DAG yn rhyw gysylltiad semantig o'ch tasgau rydych chi am eu cwblhau mewn dilyniant wedi'i ddiffinio'n llym ar amserlen benodol. Mae Airflow yn cyflwyno rhyngwyneb gwe cyfleus ar gyfer gweithio gyda DAGs ac endidau eraill:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Efallai y bydd DAG yn edrych fel hyn:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Wrth ddylunio DAG, mae datblygwr yn gosod set o weithredwyr y bydd tasgau o fewn y DAG yn cael eu hadeiladu arnynt. Yma rydym yn dod at endid pwysig arall: y Gweithredwr Llif Awyr.

Gweithredwyr

Mae gweithredwr yn endid ar sail pa enghreifftiau o swyddi sy'n cael eu creu, sy'n disgrifio beth fydd yn digwydd wrth gyflawni enghraifft swydd. Rhyddhau llif aer o GitHub eisoes yn cynnwys set o ddatganiadau yn barod i'w defnyddio. Enghreifftiau:

  • Mae BashOperator yn weithredwr ar gyfer gweithredu gorchymyn bash.
  • Mae PythonOperator yn weithredwr ar gyfer galw cod Python.
  • EmailOperator - gweithredwr ar gyfer anfon e-bost.
  • HTTPOperator - gweithredwr ar gyfer gweithio gyda cheisiadau http.
  • Mae SqlOperator yn weithredwr ar gyfer gweithredu cod SQL.
  • Mae synhwyrydd yn weithredwr ar gyfer aros am ddigwyddiad (dyfodiad yr amser a ddymunir, ymddangosiad y ffeil ofynnol, rhes yn y gronfa ddata, ymateb gan yr API, ac ati, ac ati).

Mae yna weithredwyr mwy penodol: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Gallwch hefyd ddatblygu gweithredwyr i weddu i'ch anghenion a'u defnyddio yn eich prosiect. Er enghraifft, rydym wedi creu MongoDBToHiveViaHdfsTransfer, gweithredwr ar gyfer allforio dogfennau o MongoDB i Hive, a sawl gweithredwr ar gyfer gweithio gyda nhw. CliciwchHouse: CHLoadFromHiveOperator a CHTableLoaderOperator. Mewn gwirionedd, cyn gynted ag y bydd prosiect wedi defnyddio cod sy'n seiliedig ar ddatganiadau sylfaenol yn aml, gallwch chi feddwl am ei lunio mewn datganiad newydd. Bydd hyn yn symleiddio datblygiad pellach, a byddwch yn ychwanegu at eich llyfrgell o weithredwyr yn y prosiect.

Ymhellach, mae angen cyflawni'r holl achosion hyn o dasgau, a nawr byddwn yn siarad am y trefnydd.

Trefnydd

Adeiladir ar y trefnydd tasgau yn Airflow Seleri. Llyfrgell Python yw Selery sy'n eich galluogi i drefnu ciw ynghyd â chyflawni tasgau anghydamserol a gwasgaredig. O'r ochr Llif Awyr, mae pob tasg wedi'i rhannu'n byllau. Mae pyllau yn cael eu creu â llaw. Fel rheol, eu pwrpas yw cyfyngu ar y llwyth ar weithio gyda'r ffynhonnell neu deipio tasgau y tu mewn i'r DWH. Gellir rheoli pyllau trwy'r rhyngwyneb gwe:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Mae gan bob pwll gyfyngiad ar nifer y slotiau. Wrth greu DAG, rhoddir cronfa iddo:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Gellir diystyru'r gronfa a osodwyd ar y lefel DAG ar lefel y dasg.
Mae proses ar wahân, Scheduler, yn gyfrifol am amserlennu'r holl dasgau yn Airflow. Mewn gwirionedd, mae'r Trefnydd yn delio â'r holl fecanweithiau o osod tasgau i'w cyflawni. Mae tasg yn mynd trwy sawl cam cyn ei chyflawni:

  1. Mae tasgau blaenorol wedi'u cwblhau yn y DAG, gellir ciwio un newydd.
  2. Mae'r ciw yn cael ei ddidoli yn dibynnu ar flaenoriaeth y tasgau (gellir rheoli blaenoriaethau hefyd), ac os oes slot am ddim yn y pwll, gellir cymryd y dasg i weithio.
  3. Os oes seleri gweithiwr rhydd, anfonir y dasg iddo; mae'r gwaith y gwnaethoch ei raglennu yn y dasg yn dechrau, gan ddefnyddio un gweithredwr neu'r llall.

Digon syml.

Mae'r Trefnydd yn rhedeg ar set o'r holl DAGs a'r holl dasgau o fewn DAGs.

Er mwyn i’r Trefnydd ddechrau gweithio gyda’r DAG, mae angen i’r DAG osod amserlen:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Mae set o ragosodiadau parod: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Gallwch hefyd ddefnyddio ymadroddion cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Dyddiad Cyflawni

Er mwyn deall sut mae Llif Awyr yn gweithio, mae'n bwysig deall beth yw Dyddiad Cyflawni ar gyfer DAG. Mae gan y DAG Llif Awyr y dimensiwn Dyddiad Cyflawni, h.y., yn dibynnu ar amserlen waith y DAG, crëir enghreifftiau tasg ar gyfer pob Dyddiad Cyflawni. Ac ar gyfer pob Dyddiad Cyflawni, gellir ail-gyflawni tasgau - neu, er enghraifft, gall DAG weithio ar yr un pryd mewn sawl Dyddiad Cyflawni. Dangosir hyn yn glir yma:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Yn anffodus (neu efallai'n ffodus: mae'n dibynnu ar y sefyllfa), os yw gweithrediad y dasg yn y DAG yn gywir, yna bydd y gweithredu yn y Dyddiad Cyflawni blaenorol yn cyd-fynd â'r addasiadau. Mae hyn yn dda os oes angen i chi ailgyfrifo data yn y gorffennol gan ddefnyddio algorithm newydd, ond mae'n ddrwg oherwydd bod atgynhyrchedd y canlyniad yn cael ei golli (wrth gwrs, nid oes unrhyw un yn trafferthu dychwelyd y fersiwn ofynnol o'r cod ffynhonnell o Git a chyfrifo'r hyn rydych chi angen unwaith, yn ôl yr angen).

Cynhyrchu tasgau

Mae gweithrediad DAG yn god Python, felly mae gennym ffordd gyfleus iawn i leihau faint o god wrth weithio, er enghraifft, gyda ffynonellau wedi'u torri. Tybiwch fod gennych dri darn MySQL fel ffynhonnell, mae angen i chi ddringo i mewn i bob un a chasglu rhywfaint o ddata. Ac yn annibynnol ac yn gyfochrog. Efallai y bydd y cod Python yn y DAG yn edrych fel hyn:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Mae'r DAG yn edrych fel hyn:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Ar yr un pryd, gallwch chi ychwanegu neu dynnu darn bach trwy addasu'r gosodiad a diweddaru'r DAG. Cyfforddus!

Gallwch hefyd ddefnyddio dulliau mwy cymhleth o gynhyrchu cod, er enghraifft, gweithio gyda ffynonellau ar ffurf cronfa ddata neu ddisgrifio strwythur tabl, algorithm ar gyfer gweithio gyda thabl, ac, gan ystyried nodweddion seilwaith DWH, cynhyrchu'r broses. o lwytho N tablau i mewn i'ch storfa. Neu, er enghraifft, gan weithio gydag API nad yw'n cefnogi gweithio gyda pharamedr ar ffurf rhestr, gallwch gynhyrchu N tasg yn y DAG gan ddefnyddio'r rhestr hon, cyfyngu ar gyfochrogrwydd ceisiadau yn yr API i bwll, a thynnu y data angenrheidiol o'r API. Hyblyg!

ystorfa

Mae gan Airflow ei storfa backend ei hun, cronfa ddata (efallai MySQL neu Postgres, mae gennym Postgres), sy'n storio cyflwr tasgau, DAGs, gosodiadau cysylltiad, newidynnau byd-eang, ac ati, ac ati Yma hoffwn ddweud bod yr ystorfa yn Airflow yn syml iawn (tua 20 tablau) ac yn gyfleus os ydych am adeiladu unrhyw un o'ch prosesau arno. Rwy’n cofio 100500 o fyrddau yn ystorfa Informatica, y bu’n rhaid eu smygu am amser hir cyn deall sut i adeiladu ymholiad.

Monitro

O ystyried symlrwydd yr ystorfa, gallwch adeiladu proses ar gyfer monitro tasgau sy'n gyfleus i chi. Rydyn ni'n defnyddio llyfr nodiadau yn Zeppelin, lle rydyn ni'n edrych ar statws tasgau:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Gall hefyd fod yn rhyngwyneb gwe Airflow ei hun:

Mae llif aer yn offeryn i ddatblygu a chynnal prosesau prosesu data swp yn gyfleus ac yn gyflym

Mae'r cod Airflow ar agor, felly fe wnaethom ychwanegu rhybudd yn Telegram. Mae pob enghraifft o dasg redeg, os bydd gwall yn digwydd, yn sbamio i'r grŵp Telegram, lle mae'r tîm datblygu a chymorth cyfan yn cynnwys.

Cawn ymateb prydlon trwy Telegram (os oes angen), trwy Zeppelin - darlun cyffredinol o'r tasgau yn Airflow.

Yn gyfan gwbl

Mae llif aer yn ffynhonnell agored yn gyntaf ac yn bennaf, a pheidiwch â disgwyl gwyrthiau ganddo. Byddwch yn barod i roi amser ac ymdrech i adeiladu datrysiad gweithredol. Nod o'r categori cyraeddadwy, credwch chi fi, mae'n werth chweil. Cyflymder datblygu, hyblygrwydd, rhwyddineb ychwanegu prosesau newydd - byddwch chi wrth eich bodd. Wrth gwrs, mae angen i chi dalu llawer o sylw i drefniadaeth y prosiect, sefydlogrwydd gwaith Airflow ei hun: nid oes unrhyw wyrthiau.

Nawr mae gennym Llif Awyr yn gweithio bob dydd tua 6,5 mil o dasgau. Maent yn dra gwahanol o ran eu natur. Mae yna dasgau ar gyfer llwytho data i'r prif DWH o lawer o wahanol ffynonellau a phenodol iawn, mae yna dasgau ar gyfer cyfrifo blaenau siopau y tu mewn i'r prif DWH, mae yna dasgau ar gyfer cyhoeddi data i DWH cyflym, mae yna lawer, llawer o dasgau gwahanol - a Llif Awyr yn eu cnoi drwy'r dydd. A siarad mewn niferoedd, mae hyn yn 2,3 mil Tasgau ELT o gymhlethdod amrywiol y tu mewn i DWH (Hadoop), tua 2,5 cant o gronfeydd data ffynonellau, mae hwn yn orchymyn gan 4 datblygwr ETL, sy'n cael eu rhannu'n brosesu data ETL yn DWH a phrosesu data ELT o fewn DWH ac wrth gwrs mwy un gweinyddwr, sy'n ymdrin â seilwaith y gwasanaeth.

Cynlluniau ar gyfer y dyfodol

Mae’n anochel bod nifer y prosesau’n tyfu, a’r prif beth y byddwn yn ei wneud o ran y seilwaith Llif Awyr yw graddio. Rydym am adeiladu clwstwr Llif Awyr, neilltuo cwpl o goesau ar gyfer gweithwyr Seleri, a gwneud pen dyblyg gyda phrosesau amserlennu swyddi ac ystorfa.

Epilogue

Mae hyn, wrth gwrs, ymhell o fod yn bopeth yr hoffwn ei siarad am Llif Awyr, ond ceisiais dynnu sylw at y prif bwyntiau. Daw archwaeth gyda bwyta, rhowch gynnig arni a byddwch yn ei hoffi 🙂

Ffynhonnell: hab.com

Ychwanegu sylw