Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Habari, Habr! Katika makala hii nataka kuzungumza juu ya chombo kimoja kikubwa cha kuendeleza michakato ya usindikaji wa data ya kundi, kwa mfano, katika miundombinu ya DWH ya shirika au DataLake yako. Tutazungumza juu ya Apache Airflow (hapa inajulikana kama Airflow). Imenyimwa tahadhari isivyo haki kwa Habré, na kwa sehemu kuu nitajaribu kukushawishi kwamba angalau Airflow inafaa kuangaliwa unapochagua kipanga ratiba kwa michakato yako ya ETL/ELT.

Hapo awali, niliandika mfululizo wa makala juu ya mada ya DWH nilipokuwa nikifanya kazi katika Benki ya Tinkoff. Sasa nimekuwa sehemu ya timu ya Mail.Ru Group na ninaunda jukwaa la uchambuzi wa data katika eneo la michezo ya kubahatisha. Kwa kweli, habari na masuluhisho ya kuvutia yanapoonekana, mimi na timu yangu tutazungumza hapa kuhusu jukwaa letu la uchanganuzi wa data.

Dibaji

Kwa hiyo, hebu tuanze. Airflow ni nini? Hii ni maktaba (au seti ya maktaba) kukuza, kupanga na kufuatilia michakato ya kazi. Kipengele kikuu cha Airflow: Msimbo wa Python hutumiwa kuelezea (kuza) michakato. Hii ina faida nyingi za kupanga mradi wako na maendeleo: kwa asili, mradi wako (kwa mfano) wa ETL ni mradi wa Python tu, na unaweza kuupanga kama unavyotaka, kwa kuzingatia maalum ya miundombinu, saizi ya timu na. mahitaji mengine. Kwa vyombo kila kitu ni rahisi. Tumia kwa mfano PyCharm + Git. Ni ya ajabu na rahisi sana!

Sasa hebu tuangalie vyombo kuu vya Airflow. Kwa kuelewa kiini na madhumuni yao, unaweza kupanga kikamilifu usanifu wako wa mchakato. Labda huluki kuu ni Grafu Inayoelekezwa ya Acyclic (hapa inajulikana kama DAG).

DAG

DAG ni uhusiano fulani wa maana wa kazi zako ambao ungependa kukamilisha katika mlolongo uliobainishwa kabisa kulingana na ratiba mahususi. Airflow hutoa kiolesura cha wavuti kinachofaa kwa kufanya kazi na DAG na vyombo vingine:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

DAG inaweza kuonekana kama hii:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Msanidi programu, wakati wa kuunda DAG, anaweka seti ya waendeshaji ambayo kazi ndani ya DAG itajengwa. Hapa tunakuja kwa chombo kingine muhimu: Opereta ya Utiririshaji wa hewa.

Waendeshaji

Opereta ni huluki kwa misingi ambayo matukio ya kazi yanaundwa, ambayo inaelezea kitakachotokea wakati wa utekelezaji wa mfano wa kazi. Utoaji wa mtiririko wa hewa kutoka GitHub tayari ina seti ya waendeshaji tayari kutumika. Mifano:

  • BashOperator - mwendeshaji wa kutekeleza amri ya bash.
  • PythonOperator - mwendeshaji wa kupiga nambari ya Python.
  • EmailOperator - mwendeshaji wa kutuma barua pepe.
  • HTTPOperator - operator kwa kufanya kazi na maombi ya http.
  • SqlOperator - mwendeshaji wa kutekeleza msimbo wa SQL.
  • Sensor ni operator wa kusubiri tukio (kuwasili kwa muda unaohitajika, kuonekana kwa faili inayohitajika, mstari katika hifadhidata, jibu kutoka kwa API, nk, nk).

Kuna waendeshaji maalum zaidi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Unaweza pia kuendeleza waendeshaji kulingana na sifa zako mwenyewe na kuzitumia katika mradi wako. Kwa mfano, tuliunda MongoDBToHiveViaHdfsTransfer, opereta wa kusafirisha hati kutoka MongoDB hadi Hive, na waendeshaji kadhaa kwa kufanya kazi nao. Bonyeza Nyumba: CLoadFromHiveOperator na CHTableLoaderOperator. Kimsingi, mara tu mradi unapotumia nambari ya kuthibitisha iliyojengwa kwenye taarifa za kimsingi mara kwa mara, unaweza kufikiria kuijenga kuwa taarifa mpya. Hii itarahisisha maendeleo zaidi, na utapanua maktaba yako ya waendeshaji katika mradi huo.

Ifuatayo, matukio haya yote ya kazi yanahitaji kutekelezwa, na sasa tutazungumza juu ya mpangilio.

Mratibu

Kipanga ratiba cha kazi cha Airflow kimejengwa juu yake Celery. Celery ni maktaba ya Python ambayo hukuruhusu kupanga foleni pamoja na utekelezaji wa kazi usio na usawa na uliosambazwa. Kwa upande wa Airflow, kazi zote zimegawanywa katika mabwawa. Mabwawa yanaundwa kwa mikono. Kwa kawaida, madhumuni yao ni kupunguza mzigo wa kazi wa kufanya kazi na chanzo au kuchapa kazi ndani ya DWH. Mabwawa yanaweza kusimamiwa kupitia kiolesura cha wavuti:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Kila bwawa lina kikomo kwa idadi ya inafaa. Wakati wa kuunda DAG, inapewa bwawa:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Dimbwi lililofafanuliwa katika kiwango cha DAG linaweza kubatilishwa katika kiwango cha kazi.
Mchakato tofauti, Mratibu, ana jukumu la kuratibu kazi zote katika Airflow. Kwa kweli, Mratibu hushughulika na mbinu zote za kuweka kazi za utekelezaji. Kazi hupitia hatua kadhaa kabla ya kutekelezwa:

  1. Kazi za awali zimekamilishwa katika DAG; nyingine mpya inaweza kuwekwa kwenye foleni.
  2. Foleni imepangwa kulingana na kipaumbele cha kazi (vipaumbele vinaweza pia kudhibitiwa), na ikiwa kuna slot ya bure kwenye bwawa, kazi inaweza kuchukuliwa kwa uendeshaji.
  3. Ikiwa kuna celery ya mfanyakazi wa bure, kazi inatumwa kwake; kazi uliyopanga kwenye shida huanza, kwa kutumia mwendeshaji mmoja au mwingine.

Rahisi kutosha.

Kiratibu hutekeleza seti ya DAG zote na kazi zote ndani ya DAG.

Ili Mratibu aanze kufanya kazi na DAG, DAG inahitaji kuweka ratiba:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Kuna seti ya vifaa vilivyotengenezwa tayari: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Unaweza pia kutumia misemo ya cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tarehe ya Utekelezaji

Ili kuelewa jinsi Airflow inavyofanya kazi, ni muhimu kuelewa Tarehe ya Utekelezaji ni nini kwa DAG. Katika mtiririko wa hewa, DAG ina kipimo cha Tarehe ya Utekelezaji, yaani, kulingana na ratiba ya kazi ya DAG, matukio ya majukumu yanaundwa kwa kila Tarehe ya Utekelezaji. Na kwa kila Tarehe ya Utekelezaji, kazi zinaweza kutekelezwa tena - au, kwa mfano, DAG inaweza kufanya kazi wakati huo huo katika Tarehe kadhaa za Utekelezaji. Hii inaonyeshwa wazi hapa:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Kwa bahati mbaya (au labda kwa bahati nzuri: inategemea hali hiyo), ikiwa utekelezaji wa kazi katika DAG unarekebishwa, basi utekelezaji katika Tarehe ya Utekelezaji uliopita utaendelea kuzingatia marekebisho. Hii ni nzuri ikiwa unahitaji kuhesabu tena data katika vipindi vya zamani kwa kutumia algorithm mpya, lakini ni mbaya kwa sababu uboreshaji wa matokeo umepotea (kwa kweli, hakuna mtu anayekusumbua kurudisha toleo linalohitajika la nambari ya chanzo kutoka Git na kuhesabu ni nini. unahitaji wakati mmoja, jinsi unavyohitaji).

Kuzalisha kazi

Utekelezaji wa DAG ni msimbo katika Python, kwa hiyo tuna njia rahisi sana ya kupunguza kiasi cha msimbo wakati wa kufanya kazi, kwa mfano, na vyanzo vya sharded. Wacha tuseme una shards tatu za MySQL kama chanzo, unahitaji kupanda ndani ya kila moja na kuchukua data fulani. Aidha, kwa kujitegemea na kwa sambamba. Nambari ya Python katika DAG inaweza kuonekana kama hii:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG inaonekana kama hii:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Katika kesi hii, unaweza kuongeza au kuondoa shard kwa kurekebisha tu mipangilio na uppdatering DAG. Raha!

Unaweza pia kutumia uundaji wa nambari ngumu zaidi, kwa mfano, kufanya kazi na vyanzo katika mfumo wa hifadhidata au kuelezea muundo wa meza, algorithm ya kufanya kazi na meza, na, kwa kuzingatia sifa za miundombinu ya DWH, toa mchakato. kwa kupakia jedwali N kwenye hifadhi yako. Au, kwa mfano, kufanya kazi na API ambayo haiauni kufanya kazi na kigezo katika mfumo wa orodha, unaweza kutoa kazi za N kwenye DAG kutoka kwenye orodha hii, kupunguza ulinganifu wa maombi katika API hadi kwenye dimbwi, na kukwaruza. data muhimu kutoka kwa API. Inabadilika!

hazina

Airflow ina hazina yake ya nyuma, hifadhidata (inaweza kuwa MySQL au Postgres, tuna Postgres), ambayo huhifadhi majimbo ya kazi, DAG, mipangilio ya muunganisho, vigeu vya kimataifa, n.k., n.k. Hapa ningependa niseme kwamba hazina katika Airflow ni rahisi sana (takriban jedwali 20) na ni rahisi ikiwa unataka kuunda michakato yako mwenyewe juu yake. Nakumbuka jedwali 100500 kwenye hazina ya Informatica, ambayo ilibidi isomwe kwa muda mrefu kabla ya kuelewa jinsi ya kuunda swala.

Ufuatiliaji

Kwa kuzingatia unyenyekevu wa hazina, unaweza kuunda mchakato wa ufuatiliaji wa kazi ambao unafaa kwako. Tunatumia daftari huko Zeppelin, ambapo tunaangalia hali ya kazi:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Hii pia inaweza kuwa kiolesura cha wavuti cha Airflow yenyewe:

Mtiririko wa hewa ni zana ya kukuza na kudumisha michakato ya kuchakata data ya kundi kwa urahisi na haraka

Msimbo wa Airflow ni chanzo huria, kwa hivyo tumeongeza arifa kwa Telegram. Kila mfano unaoendelea wa kazi, ikiwa hitilafu itatokea, hutuma barua taka kwenye kikundi kwenye Telegramu, ambapo timu nzima ya uendelezaji na usaidizi inajumuisha.

Tunapokea jibu la haraka kupitia Telegramu (ikihitajika), na kupitia Zeppelin tunapokea picha ya jumla ya kazi katika Airflow.

Katika jumla ya

Airflow kimsingi ni chanzo wazi, na hupaswi kutarajia miujiza kutoka kwayo. Kuwa tayari kuweka wakati na juhudi kujenga suluhisho linalofanya kazi. Lengo linaweza kufikiwa, niamini, inafaa. Kasi ya maendeleo, kubadilika, urahisi wa kuongeza michakato mpya - utaipenda. Bila shaka, unahitaji kulipa kipaumbele sana kwa shirika la mradi huo, utulivu wa Airflow yenyewe: miujiza haifanyiki.

Sasa tuna Airflow inafanya kazi kila siku kuhusu kazi elfu 6,5. Wao ni tofauti kabisa katika tabia. Kuna kazi za kupakia data kwenye DWH kuu kutoka kwa vyanzo vingi tofauti na maalum sana, kuna kazi za kuhesabu mbele ya duka ndani ya DWH kuu, kuna kazi za kuchapisha data kwenye DWH ya haraka, kuna kazi nyingi, nyingi tofauti - na Airflow. huwatafuna wote siku baada ya siku. Kuzungumza kwa idadi, hii ni 2,3 elfu Kazi za ELT za ugumu tofauti ndani ya DWH (Hadoop), takriban. hifadhidata mia 2,5 vyanzo, hii ni timu kutoka Watengenezaji 4 wa ETL, ambazo zimegawanywa katika usindikaji wa data wa ETL katika usindikaji wa data wa DWH na ELT ndani ya DWH na bila shaka zaidi msimamizi mmoja, anayeshughulika na miundombinu ya huduma.

Mipango ya siku zijazo

Idadi ya michakato inakua bila kuepukika, na jambo kuu ambalo tutakuwa tukifanya katika suala la miundombinu ya Airflow ni kuongeza. Tunataka kuunda kikundi cha Airflow, kutenga jozi ya miguu kwa wafanyikazi wa Celery, na kutengeneza kichwa cha kujinakili chenye michakato ya kuratibu kazi na hazina.

Epilogue

Hii, kwa kweli, sio kila kitu ambacho ningependa kusema juu ya Airflow, lakini nilijaribu kuangazia mambo kuu. Hamu inakuja na kula, jaribu na utaipenda :)

Chanzo: mapenzi.com

Kuongeza maoni