Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Hello, Habr! A cikin wannan labarin ina so in yi magana game da babban kayan aiki guda ɗaya don haɓaka hanyoyin sarrafa bayanai, alal misali, a cikin kayan aikin DWH na kamfani ko DataLake ɗin ku. Za mu yi magana game da Apache Airflow (wanda ake kira Airflow). Ba a hana shi kulawa ba bisa ka'ida ba akan Habré, kuma a cikin babban ɓangaren zan yi ƙoƙarin gamsar da ku cewa aƙalla Airflow yana da daraja a duba lokacin zabar mai tsara tsarin tafiyarku na ETL/ELT.

A baya, na rubuta jerin labarai kan batun DWH lokacin da nake aiki a bankin Tinkoff. Yanzu na zama wani ɓangare na ƙungiyar Mail.Ru Group kuma ina haɓaka dandamali don nazarin bayanai a yankin wasan. A gaskiya, kamar yadda labarai da mafita masu ban sha'awa suka bayyana, ni da ƙungiyara za mu yi magana a nan game da dandalinmu don nazarin bayanai.

Gabatarwa

Don haka, bari mu fara. Menene Airflow? Wannan ɗakin karatu ne (ko saitin dakunan karatu) don haɓakawa, tsarawa da kuma lura da ayyukan aiki. Babban fasalin Airflow: Ana amfani da lambar Python don bayyana (haɓaka) matakai. Wannan yana da fa'idodi da yawa don tsara aikinku da haɓakawa: a zahiri, aikinku (misali) aikin ETL aikin Python ne kawai, kuma kuna iya tsara shi yadda kuke so, la'akari da ƙayyadaddun kayan more rayuwa, girman ƙungiyar da sauran bukatun. Kayan aiki duk abin da yake mai sauƙi ne. Yi amfani da misali PyCharm + Git. Yana da ban mamaki kuma ya dace sosai!

Yanzu bari mu dubi manyan abubuwan da ake kira Airflow. Ta hanyar fahimtar ainihin su da manufarsu, za ku iya tsara tsarin gine-ginenku da kyau. Wataƙila babban mahaɗin shine Hotunan Acyclic Directed (nan gaba ana kiransa DAG).

Dag

DAG wani yanki ne mai ma'ana na ayyukanku waɗanda kuke son kammalawa cikin ƙayyadaddun ma'auni bisa ga takamaiman jadawalin. Airflow yana ba da damar yanar gizo mai dacewa don aiki tare da DAGs da sauran abubuwa:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

DAG na iya zama kamar haka:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Mai haɓakawa, lokacin zayyana DAG, yana shimfiɗa saitin masu aiki akan waɗanda za'a gina ayyuka a cikin DAG. Anan mun zo ga wani muhimmin mahalli: Airflow Operator.

Masu aiki

Ma'aikacin wata ƙungiya ce akan tushen da aka ƙirƙiri al'amuran aiki, wanda ke bayyana abin da zai faru yayin aiwatar da aikin misali. Fitowar iska daga GitHub ya riga ya ƙunshi saitin masu aiki da shirye don amfani. Misalai:

  • BashOperator - mai aiki don aiwatar da umarnin bash.
  • PythonOperator - mai aiki don kiran lambar Python.
  • EmailOperator - mai aiki don aika imel.
  • HTTPOperator - mai aiki don aiki tare da buƙatun http.
  • SqlOperator - mai aiki don aiwatar da lambar SQL.
  • Sensor mai aiki ne don jiran wani taron (shigowar lokacin da ake buƙata, bayyanar fayil ɗin da ake buƙata, layi a cikin bayanan, amsa daga API, da sauransu, da sauransu).

Akwai ƙarin takamaiman masu aiki: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Hakanan zaka iya haɓaka masu aiki dangane da halayen ku kuma amfani da su a cikin aikin ku. Misali, mun ƙirƙiri MongoDBToHiveViaHdfsTransfer, ma'aikaci don fitar da takardu daga MongoDB zuwa Hive, da masu aiki da yawa don aiki tare da su. DannaHause: CHLoadFromHiveOperator da CHTableLoaderOperator. Mahimmanci, da zaran aikin ya yawaita amfani da lamba da aka gina akan mahimman bayanai, zaku iya tunanin gina shi zuwa sabuwar sanarwa. Wannan zai sauƙaƙa ƙarin haɓakawa, kuma zaku faɗaɗa ɗakin karatu na masu aiki a cikin aikin.

Na gaba, duk waɗannan lokuta na ayyuka suna buƙatar aiwatar da su, kuma yanzu za mu yi magana game da mai tsarawa.

Mai tsara jadawalin

An gina tsarin jadawalin aikin Airflow akan shi seleri. Celery ɗakin karatu ne na Python wanda ke ba ku damar tsara jerin gwano tare da asynchronous da rarraba ayyukan ayyuka. A gefen iska, duk ayyuka an raba su zuwa wuraren waha. An ƙirƙiri wuraren tafkuna da hannu. Yawanci, manufar su shine iyakance aikin aiki tare da tushen ko don buga ayyuka a cikin DWH. Ana iya sarrafa wuraren tafkunan ta hanyar mahaɗar yanar gizo:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Kowane tafkin yana da iyaka akan adadin ramummuka. Lokacin ƙirƙirar DAG, ana ba shi tafkin:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Wurin da aka ayyana a matakin DAG za a iya soke shi a matakin aiki.
Wani tsari na daban, Mai tsarawa, shine ke da alhakin tsara duk ayyuka a cikin iska. A haƙiƙa, Mai tsarawa yana ma'amala da duk injiniyoyi na saita ayyuka don aiwatarwa. Aikin yana tafiya matakai da yawa kafin a aiwatar da shi:

  1. Ayyukan da suka gabata an kammala su a cikin DAG; ana iya yin layi na sabo.
  2. An jera jerin gwano dangane da fifikon ayyuka (mafi mahimmanci kuma ana iya sarrafa su), kuma idan akwai ramin kyauta a cikin tafkin, ana iya ɗaukar aikin cikin aiki.
  3. Idan akwai seleri na ma'aikaci kyauta, ana aika aikin zuwa gare shi; aikin da kuka tsara a cikin matsalar yana farawa, ta amfani da ɗaya ko wani ma'aikaci.

Mai sauƙi isa.

Mai tsara tsarawa yana gudana akan saitin duk DAGs da duk ayyuka a cikin DAGs.

Don Mai tsarawa don fara aiki tare da DAG, DAG yana buƙatar saita jadawalin:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Akwai saitin shirye-shiryen da aka ƙera: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Hakanan zaka iya amfani da maganganun cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ranar aiwatarwa

Don fahimtar yadda kwararar iska ke aiki, yana da mahimmanci a fahimci menene Ranar Kisa na DAG. A cikin Airflow, DAG yana da girman Kwanan Kisa, watau, dangane da jadawalin aikin DAG, an ƙirƙiri lokuta na aiki don kowane Ranar Kisa. Kuma ga kowace Ranar Kisa, ana iya sake aiwatar da ayyuka - ko, alal misali, DAG na iya aiki a lokaci ɗaya a cikin Kwanakin Kisa da yawa. Ana nuna wannan a fili a nan:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Abin takaici (ko watakila sa'a: ya dogara da halin da ake ciki), idan an gyara aiwatar da aikin a cikin DAG, to, aiwatarwa a cikin Kwanan Kisa na baya zai ci gaba da la'akari da gyare-gyare. Wannan yana da kyau idan kuna buƙatar sake ƙididdige bayanai a cikin lokutan da suka gabata ta amfani da sabon algorithm, amma yana da kyau saboda sake fasalin sakamakon ya ɓace (ba shakka, babu wanda ya dame ku don dawo da sigar da ake buƙata na lambar tushe daga Git kuma ku lissafta menene. kana bukatar lokaci guda, yadda kake bukata).

Samar da ayyuka

Aiwatar da DAG shine lambar a cikin Python, don haka muna da hanya mai dacewa don rage yawan adadin lokacin aiki, misali, tare da tushen tushen. Bari mu ce kuna da shards MySQL guda uku a matsayin tushen, kuna buƙatar hawa cikin kowane ɗayan kuma ku ɗauki wasu bayanai. Bugu da ƙari, mai zaman kansa kuma a cikin layi daya. Lambar Python a cikin DAG na iya yin kama da wannan:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG yayi kama da haka:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

A wannan yanayin, zaku iya ƙara ko cire shard ta hanyar daidaita saitunan kawai da sabunta DAG. Dadi!

Hakanan zaka iya amfani da tsararrun lambar ƙira, alal misali, aiki tare da tushe a cikin nau'ikan bayanai ko bayyana tsarin tebur, algorithm don aiki tare da tebur, kuma, la'akari da fasalulluka na kayan aikin DWH, samar da tsari. don loda allunan N cikin ma'ajiyar ku. Ko, alal misali, yin aiki tare da API wanda baya goyan bayan aiki tare da siga a cikin nau'i na jeri, zaku iya samar da ayyukan N a cikin DAG daga wannan jerin, iyakance daidaiton buƙatun a cikin API zuwa tafkin kuma goge mahimman bayanai daga API. M!

wurin ajiya

Airflow yana da ma'ajiyar bayanan baya, ma'adanin bayanai (zai iya zama MySQL ko Postgres, muna da Postgres), wanda ke adana jihohin ayyuka, DAGs, saitunan haɗin gwiwa, canjin yanayi, da sauransu, da sauransu. Anan zan iya cewa ma'ajiya a cikin Airflow yana da sauqi sosai (kimanin teburi 20) kuma ya dace idan kuna son gina kowane tsarin ku akan sa. Na tuna tebur 100500 a cikin ma'ajiyar Informatica, wanda dole ne a yi nazari na dogon lokaci kafin fahimtar yadda ake gina tambaya.

Kulawa

Idan aka ba da sauƙi na ma'ajiyar, za ku iya gina tsarin kulawa da aiki wanda ya dace da ku. Muna amfani da faifan rubutu a cikin Zeppelin, inda muka kalli matsayin ayyuka:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Wannan kuma na iya zama haɗin yanar gizo na Airflow kanta:

Airflow kayan aiki ne don dacewa da sauri haɓakawa da kiyaye tsarin sarrafa bayanai

Lambar Airflow buɗe ce, don haka mun ƙara faɗakarwa zuwa Telegram. Kowane misali mai gudana na ɗawainiya, idan kuskure ya faru, zazzage ƙungiyar a cikin Telegram, inda ƙungiyar haɓakawa da tallafi ta ƙunshi duka.

Muna karɓar amsa mai sauri ta hanyar Telegram (idan an buƙata), kuma ta hanyar Zeppelin muna karɓar cikakken hoto na ayyuka a cikin iska.

Jimlar

Tushen iska shine tushen buɗe ido, kuma bai kamata ku yi tsammanin abubuwan al'ajabi daga gare ta ba. Yi shiri don saka lokaci da ƙoƙari don gina mafita mai aiki. Manufar ita ce cimma nasara, yi imani da ni, yana da daraja. Gudun ci gaba, sassauci, sauƙi na ƙara sababbin matakai - za ku so shi. Tabbas, kuna buƙatar kulawa da yawa ga tsarin aikin, kwanciyar hankali na Airflow kanta: abubuwan al'ajabi ba sa faruwa.

Yanzu muna da Airflow aiki kullum kusan ayyuka dubu 6,5. Suna da bambanci sosai a hali. Akwai ayyuka na loda bayanai zuwa cikin babban DWH daga wurare daban-daban kuma na musamman, akwai ayyuka na kirga wuraren ajiya a cikin babban DWH, akwai ayyuka na buga bayanai zuwa cikin DWH mai sauri, akwai ayyuka daban-daban da yawa - da kuma Airflow. yana tauna su gaba daya kowace rana. Magana a cikin lambobi, wannan shine dubu 2,3 Ayyukan ELT na bambancin rikitarwa tsakanin DWH (Hadoop), kimanin. 2,5 dari na bayanai majiyoyi, wannan tawaga ce daga 4 ETL masu haɓakawa, wanda aka raba zuwa sarrafa bayanan ETL a cikin DWH da sarrafa bayanan ELT a cikin DWH kuma ba shakka ƙari admin daya, wanda ke ma'amala da abubuwan more rayuwa na sabis.

Shirye-shirye na nan gaba

Ba makawa adadin hanyoyin yana karuwa, kuma babban abin da za mu yi dangane da abubuwan da ake amfani da su na Airflow shine haɓakawa. Muna so mu gina gungu na Airflow, ware ƙafafu biyu don ma'aikatan Celery, da kuma yin shugaban mai kwafin kansa tare da tsarin tsara ayyukan aiki da wurin ajiya.

Epilogue

Wannan, ba shakka, ba duk abin da zan so in fada game da Airflow ba ne, amma na yi ƙoƙari na haskaka mahimman abubuwan. Ci abinci yana zuwa tare da cin abinci, gwada shi kuma za ku so shi :)

source: www.habr.com

Add a comment