Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Ndewo, Habr! N'isiokwu a, achọrọ m ikwu maka otu nnukwu ngwá ọrụ maka ịmepụta usoro nhazi data batch, dịka ọmụmaatụ, na akụrụngwa nke DWH ụlọ ọrụ ma ọ bụ DataLake gị. Anyị ga-ekwu maka Apache Airflow (nke a na-akpọ Airflow). A napụrụ ya nlebara anya n'ụzọ na-ezighị ezi na Habré, na akụkụ bụ isi m ga-agbalị ime ka ị kwenye na ọ dịkarịa ala Airflow kwesịrị ile anya mgbe ị na-ahọrọ onye nhazi maka usoro ETL / ELT gị.

Na mbụ, edere m usoro isiokwu gbasara isiokwu DWH mgbe m na-arụ ọrụ na Tinkoff Bank. Ugbu a abụrụla m akụkụ nke otu Mail.Ru Group ma na-emepụta usoro ikpo okwu maka nyocha data na mpaghara egwuregwu. N'ezie, dị ka akụkọ na ngwọta na-adọrọ mmasị na-apụta, mụ na ndị otu m ga-ekwu okwu ebe a gbasara ikpo okwu anyị maka nyocha data.

Nkọwapụta

Ya mere, ka anyị malite. Kedu ihe bụ Airflow? Nke a bụ ụlọ akwụkwọ (ma ọ bụ set nke ọba akwụkwọ) ịzụlite, hazie na nyochaa usoro ọrụ. Akụkụ bụ isi nke Airflow: A na-eji koodu Python kọwaa (ịzụlite) usoro. Nke a nwere ọtụtụ uru maka ịhazi ọrụ gị na mmepe gị: n'ezie, ọrụ gị (dịka ọmụmaatụ) ọrụ ETL bụ naanị ọrụ Python, ị nwere ike ịhazi ya dịka ịchọrọ, na-eburu n'uche nkọwa nke akụrụngwa, nha otu na otu. ihe ndị ọzọ chọrọ. Instrumentally ihe niile dị mfe. Jiri ọmụmaatụ PyCharm + Git. Ọ mara mma ma dịkwa mma!

Ugbu a, ka anyị leba anya na isi ụlọ ọrụ nke Airflow. Site n'ịghọta ihe bụ isi na ebumnuche ha, ị nwere ike hazie nhazi usoro gị nke ọma. Ikekwe ihe bụ isi bụ eserese Acyclic eduzi (nke a na-akpọkwa ya DAG).

DAG

DAG bụ ụfọdụ mkpakọrịta bara uru nke ọrụ gị nke ịchọrọ imezu n'usoro akọwapụtara nke ọma dịka usoro nhazi. Airflow na-enye interface webụ dị mma maka ịrụ ọrụ na DAG na ụlọ ọrụ ndị ọzọ:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

DAG nwere ike ịdị ka nke a:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Onye nrụpụta, mgbe ị na-emepụta DAG, na-atọgbọrọ ndị na-arụ ọrụ nke a ga-arụ ọrụ n'ime DAG. N'ebe a, anyị na-abịakwute ụlọ ọrụ ọzọ dị mkpa: Airflow Operator.

Ndị ọrụ

Onye na-arụ ọrụ bụ ụlọ ọrụ na-adabere n'ụdị ọrụ, nke na-akọwa ihe ga-eme n'oge a na-eme ihe ngosi ọrụ. Mwepụta ikuku na GitHub enweelarị usoro ndị ọrụ dị njikere iji. Ọmụmaatụ:

  • BashOperator - onye na-arụ ọrụ maka imezu iwu bash.
  • PythonOperator - onye ọrụ maka ịkpọ koodu Python.
  • EmailOperator - onye ọrụ maka izipu email.
  • HTTPOperator - onye ọrụ maka iji arịrịọ http rụọ ọrụ.
  • SqlOperator - onye ọrụ maka imezu koodu SQL.
  • Sensọ bụ onye na-arụ ọrụ maka ichere ihe omume (ọbịbịa nke oge achọrọ, ọdịdị nke faịlụ achọrọ, ahịrị na nchekwa data, nzaghachi sitere na API, wdg, wdg).

Enwere ndị ọrụ akọwapụtara nke ọma: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Ị nwekwara ike ịmepụta ndị na-arụ ọrụ dabere na njirimara nke gị ma jiri ha na ọrụ gị. Dịka ọmụmaatụ, anyị mepụtara MongoDBToHiveViaHdfsTransfer, onye na-arụ ọrụ maka mbupụ akwụkwọ site na MongoDB gaa na Hive, yana ọtụtụ ndị na-arụ ọrụ maka ịrụ ọrụ na ya. Pịa olọ: CHLoadFromHiveOperator na CHTableLoaderOperator. N'ezie, ozugbo oru ngo ejirila koodu arụnyere na nkwupụta ndị bụ isi, ị nwere ike iche maka ịmepụta ya ka ọ bụrụ nkwupụta ọhụrụ. Nke a ga-eme ka mmepe ọzọ dị mfe, ị ga-agbasa ọba akwụkwọ nke ndị na-arụ ọrụ na ọrụ ahụ.

Na-esote, a ga-emerịrị ihe omume ndị a niile nke ọrụ, ma ugbu a, anyị ga-ekwu maka onye nhazi oge.

Onye nhazi oge

Ewubere onye nhazi ọrụ Airflow Celery. Celery bụ ọba akwụkwọ Python na-enye gị ohere ịhazi kwụ n'ahịrị gbakwunyere asynchronous na nkesa na-arụ ọrụ. N'akụkụ Airflow, a na-ekewa ọrụ niile na ọdọ mmiri. Eji aka na-emepụta ọdọ mmiri. Na-emekarị, ebumnuche ha bụ igbochi oke ọrụ nke iji isi iyi rụọ ọrụ ma ọ bụ depụta ọrụ n'ime DWH. Enwere ike ijikwa ọdọ mmiri site na ntanetị weebụ:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Ọdọ mmiri ọ bụla nwere oke na ọnụ ọgụgụ oghere. Mgbe ị na-eke DAG, a na-enye ya ọdọ mmiri:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Ọdọ mmiri akọwapụtara na ọkwa DAG nwere ike ịfefe na ọkwa ọrụ.
Usoro dị iche, Onye nhazi oge, bụ maka ịhazi ọrụ niile na Airflow. N'ezie, Scheduler na-arụkọ ọrụ maka sistemụ niile nke ịtọ ọrụ maka mmezu. Ọrụ a na-agafe ọtụtụ usoro tupu emee ya:

  1. Emechaala ọrụ ndị gara aga na DAG; nke ọhụrụ nwere ike ị kwụ n'ahịrị.
  2. A na-ahazi kwụ n'ahịrị dabere na mkpa nke ọrụ (a pụkwara ịchịkwa ihe ndị dị mkpa), ma ọ bụrụ na enwere oghere n'efu na ọdọ mmiri ahụ, a ga-arụ ọrụ ahụ n'ọrụ.
  3. Ọ bụrụ na enwere celery onye ọrụ n'efu, a na-ezigara ya ọrụ ahụ; Ọrụ ahụ ị na-ahazi na nsogbu ahụ na-amalite, na-eji otu onye ọrụ ma ọ bụ ọzọ.

Dị mfe zuru oke.

Onye nhazi oge na-agba ọsọ n'usoro DAG niile yana ọrụ niile dị n'ime DAG.

Maka onye nhazi ịmalite ịrụ ọrụ na DAG, DAG kwesịrị ịtọ oge:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

E nwere usoro ihe atọrọ emebere emebere: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Ị nwekwara ike iji okwu cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ụbọchị igbu

Iji ghọta ka Airflow si arụ ọrụ, ọ dị mkpa ịghọta ihe Ụbọchị Mkpebi bụ maka DAG. Na Airflow, DAG nwere akụkụ ụbọchị mmezu, ya bụ, dabere na usoro ọrụ DAG, a na-emepụta ihe omume maka ụbọchị mmezu ọ bụla. Na maka ụbọchị mmezu nke ọ bụla, enwere ike ịmegharị ọrụ ọzọ - ma ọ bụ, dịka ọmụmaatụ, DAG nwere ike ịrụ ọrụ n'otu oge n'ọtụtụ ụbọchị mmezu. E gosipụtara nke a nke ọma ebe a:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

N'ụzọ dị mwute (ma ọ bụ ma eleghị anya, ọ dabara nke ọma: ọ dabere na ọnọdụ ahụ), ọ bụrụ na emeziri mmejuputa ọrụ ahụ na DAG, mgbe ahụ, ogbugbu na Ụbọchị Mkpebi gara aga ga-aga n'ihu na-eburu n'uche mgbanwe ndị ahụ. Nke a dị mma ma ọ bụrụ na ịchọrọ ịtụgharị data n'oge gara aga site na iji algọridim ọhụrụ, mana ọ dị njọ n'ihi na nrụpụta nke nsonaazụ ahụ efunahụla (n'ezie, ọ nweghị onye na-enye gị nsogbu iweghachite ụdị koodu isi iyi achọrọ site na Git wee gbakọọ ihe. ị chọrọ otu oge, otu ị chọrọ ya).

Na-emepụta ọrụ

Mmezu nke DAG bụ koodu na Python, yabụ anyị nwere ụzọ dị mma iji belata ọnụọgụ koodu mgbe ị na-arụ ọrụ, dịka ọmụmaatụ, na isi mmalite nke sharded. Ka anyị kwuo na ị nwere atọ MySQL shards dị ka isi iyi, ịkwesịrị ịrịgo n'ime nke ọ bụla wee buru ụfọdụ data. Ọzọkwa, n'onwe ya na n'usoro. Koodu Python dị na DAG nwere ike ịdị ka nke a:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG dị ka nke a:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

N'okwu a, ịnwere ike ịgbakwunye ma ọ bụ wepu shard site na ịhazigharị ntọala na imelite DAG. Nkasi obi!

Ị nwekwara ike iji ọgbọ koodu mgbagwoju anya karị, dịka ọmụmaatụ, na-arụ ọrụ na isi mmalite n'ụdị nchekwa data ma ọ bụ kọwaa nhazi okpokoro, algọridim maka ịrụ ọrụ na tebụl, na, na-eburu n'uche njirimara nke akụrụngwa DWH, mepụta usoro. maka itinye tebụl N n'ime nchekwa gị. Ma ọ bụ, dịka ọmụmaatụ, na-arụ ọrụ na API nke na-adịghị akwado ịrụ ọrụ na paramita n'ụdị ndepụta, ị nwere ike ịmepụta ọrụ N na DAG site na ndepụta a, kpachie nkwekọ nke arịrịọ na API na ọdọ mmiri, na scrape. data dị mkpa sitere na API. Na-agbanwe agbanwe!

ebe nchekwa

Airflow nwere ebe nchekwa azụ azụ ya, nchekwa data (nwere ike ịbụ MySQL ma ọ bụ Postgres, anyị nwere Postgres), nke na-echekwa steeti ọrụ, DAG, ntọala njikọ, mgbanwe ụwa, wdg, wdg. Ebe a ga-amasị m ịsị na ebe nchekwa na Airflow dị nnọọ mfe (ihe dị ka tebụl 20) ma dị mma ma ọ bụrụ na ịchọrọ ịmepụta usoro nke gị ọ bụla n'elu ya. Echetara m tebụl 100500 dị na Informatica repository, nke a ga-amụ ogologo oge tupu ịghọta otú e si ewu ajụjụ.

Nlekota oru

Nyere ịdị mfe nke ebe nchekwa ahụ, ị ​​nwere ike wulite usoro nlekota oru nke dị gị mma. Anyị na-eji akwụkwọ ndetu na Zeppelin, ebe anyị na-elele ọkwa ọrụ:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Nke a nwekwara ike ịbụ interface weebụ nke Airflow n'onwe ya:

Ikuku ikuku bụ ngwá ọrụ maka ịmepụta ngwa ngwa na ịmepụta usoro nhazi data batch

Koodu ikuku ikuku bụ ebe mepere emepe, yabụ anyị agbakwunyela ịdọ aka na ntị na Telegram. Ihe atụ ọ bụla na-agba ọsọ nke ọrụ, ọ bụrụ na njehie emee, na-eme ka ndị otu ahụ na Telegram, ebe otu mmepe na nkwado niile mejupụtara.

Anyị na-enweta nzaghachi ngwa ngwa site na Telegram (ọ bụrụ na achọrọ), na site na Zeppelin anyị na-enweta foto zuru ezu nke ọrụ na Airflow.

Ọnụ

Ikuku ikuku bụ isi mmalite mepere emepe, ịkwesighi ịtụ anya ọrụ ebube site na ya. Dị njikere itinye oge na mgbalị iji wuo ngwọta na-arụ ọrụ. Ebumnobi a nwere ike imezu, kwere m, ọ bara uru. Ọsọ nke mmepe, mgbanwe, ịdị mfe nke ịgbakwunye usoro ọhụrụ - ọ ga-amasị gị. N'ezie, ịkwesịrị ịṅa ntị nke ukwuu na nhazi nke ọrụ ahụ, nkwụsi ike nke Airflow n'onwe ya: ọrụ ebube adịghị eme.

Ugbu a anyị nwere Airflow na-arụ ọrụ kwa ụbọchị banyere 6,5 puku ọrụ. Ha dị nnọọ iche na agwa. Enwere ọrụ nke itinye data n'ime DWH bụ isi site na isi mmalite dị iche iche na nke akọwapụtara nke ọma, enwere ọrụ nke ịgbakọ ụlọ ahịa n'ime isi DWH, enwere ọrụ nke ibipụta data n'ime DWH ngwa ngwa, enwere ọtụtụ ọrụ dị iche iche - yana Airflow. na-ata ha niile kwa ụbọchị. Na-ekwu na ọnụọgụgụ, nke a bụ 2,3 puku Ọrụ ELT nke mgbagwoju anya dị iche iche n'ime DWH (Hadoop), ihe ruru. 2,5 narị ọdụ data isi mmalite, nke a bụ otu si 4 ndị mmepe ETL, nke kewara na nhazi data ETL na DWH na ELT data nhazi n'ime DWH na n'ezie ihe ndị ọzọ otu admin, onye na-ahụ maka akụrụngwa nke ọrụ ahụ.

Atụmatụ maka ọdịnihu

Ọnụ ọgụgụ nke usoro a na-apụghị izere ezere na-eto eto, na isi ihe anyị ga-eme n'ihe gbasara akụrụngwa nke Airflow bụ ịcha. Anyị chọrọ iwu ụyọkọ Airflow, kenye ndị ọrụ Celery otu ụzọ ụkwụ, ma mee isi ihe na-emegharị onwe ya na usoro nhazi ọrụ na ebe nchekwa.

Epilogue

Nke a, n'ezie, ọ bụghị ihe niile m ga-achọ ịkọrọ banyere Airflow, ma m gbalịrị ime ka isi ihe. Agụụ na-abịa na iri nri, nwaa ya ma ọ ga-amasị gị :)

isi: www.habr.com

Tinye a comment