Habari, Habr! Katika makala hii nataka kuzungumza juu ya chombo kimoja kikubwa cha kuendeleza michakato ya usindikaji wa data ya kundi, kwa mfano, katika miundombinu ya DWH ya shirika au DataLake yako. Tutazungumza juu ya Apache Airflow (hapa inajulikana kama Airflow). Imenyimwa tahadhari isivyo haki kwa Habré, na kwa sehemu kuu nitajaribu kukushawishi kwamba angalau Airflow inafaa kuangaliwa unapochagua kipanga ratiba kwa michakato yako ya ETL/ELT.
Hapo awali, niliandika mfululizo wa makala juu ya mada ya DWH nilipokuwa nikifanya kazi katika Benki ya Tinkoff. Sasa nimekuwa sehemu ya timu ya Mail.Ru Group na ninaunda jukwaa la uchambuzi wa data katika eneo la michezo ya kubahatisha. Kwa kweli, habari na masuluhisho ya kuvutia yanapoonekana, mimi na timu yangu tutazungumza hapa kuhusu jukwaa letu la uchanganuzi wa data.
Dibaji
Kwa hiyo, hebu tuanze. Airflow ni nini? Hii ni maktaba (au
Sasa hebu tuangalie vyombo kuu vya Airflow. Kwa kuelewa kiini na madhumuni yao, unaweza kupanga kikamilifu usanifu wako wa mchakato. Labda huluki kuu ni Grafu Inayoelekezwa ya Acyclic (hapa inajulikana kama DAG).
DAG
DAG ni uhusiano fulani wa maana wa kazi zako ambao ungependa kukamilisha katika mlolongo uliobainishwa kabisa kulingana na ratiba mahususi. Airflow hutoa kiolesura cha wavuti kinachofaa kwa kufanya kazi na DAG na vyombo vingine:
DAG inaweza kuonekana kama hii:
Msanidi programu, wakati wa kuunda DAG, anaweka seti ya waendeshaji ambayo kazi ndani ya DAG itajengwa. Hapa tunakuja kwa chombo kingine muhimu: Opereta ya Utiririshaji wa hewa.
Waendeshaji
Opereta ni huluki kwa misingi ambayo matukio ya kazi yanaundwa, ambayo inaelezea kitakachotokea wakati wa utekelezaji wa mfano wa kazi.
- BashOperator - mwendeshaji wa kutekeleza amri ya bash.
- PythonOperator - mwendeshaji wa kupiga nambari ya Python.
- EmailOperator - mwendeshaji wa kutuma barua pepe.
- HTTPOperator - operator kwa kufanya kazi na maombi ya http.
- SqlOperator - mwendeshaji wa kutekeleza msimbo wa SQL.
- Sensor ni operator wa kusubiri tukio (kuwasili kwa muda unaohitajika, kuonekana kwa faili inayohitajika, mstari katika hifadhidata, jibu kutoka kwa API, nk, nk).
Kuna waendeshaji maalum zaidi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Unaweza pia kuendeleza waendeshaji kulingana na sifa zako mwenyewe na kuzitumia katika mradi wako. Kwa mfano, tuliunda MongoDBToHiveViaHdfsTransfer, opereta wa kusafirisha hati kutoka MongoDB hadi Hive, na waendeshaji kadhaa kwa kufanya kazi nao.
Ifuatayo, matukio haya yote ya kazi yanahitaji kutekelezwa, na sasa tutazungumza juu ya mpangilio.
Mratibu
Kipanga ratiba cha kazi cha Airflow kimejengwa juu yake
Kila bwawa lina kikomo kwa idadi ya inafaa. Wakati wa kuunda DAG, inapewa bwawa:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Dimbwi lililofafanuliwa katika kiwango cha DAG linaweza kubatilishwa katika kiwango cha kazi.
Mchakato tofauti, Mratibu, ana jukumu la kuratibu kazi zote katika Airflow. Kwa kweli, Mratibu hushughulika na mbinu zote za kuweka kazi za utekelezaji. Kazi hupitia hatua kadhaa kabla ya kutekelezwa:
- Kazi za awali zimekamilishwa katika DAG; nyingine mpya inaweza kuwekwa kwenye foleni.
- Foleni imepangwa kulingana na kipaumbele cha kazi (vipaumbele vinaweza pia kudhibitiwa), na ikiwa kuna slot ya bure kwenye bwawa, kazi inaweza kuchukuliwa kwa uendeshaji.
- Ikiwa kuna celery ya mfanyakazi wa bure, kazi inatumwa kwake; kazi uliyopanga kwenye shida huanza, kwa kutumia mwendeshaji mmoja au mwingine.
Rahisi kutosha.
Kiratibu hutekeleza seti ya DAG zote na kazi zote ndani ya DAG.
Ili Mratibu aanze kufanya kazi na DAG, DAG inahitaji kuweka ratiba:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Kuna seti ya vifaa vilivyotengenezwa tayari: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Unaweza pia kutumia misemo ya cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Tarehe ya Utekelezaji
Ili kuelewa jinsi Airflow inavyofanya kazi, ni muhimu kuelewa Tarehe ya Utekelezaji ni nini kwa DAG. Katika mtiririko wa hewa, DAG ina kipimo cha Tarehe ya Utekelezaji, yaani, kulingana na ratiba ya kazi ya DAG, matukio ya majukumu yanaundwa kwa kila Tarehe ya Utekelezaji. Na kwa kila Tarehe ya Utekelezaji, kazi zinaweza kutekelezwa tena - au, kwa mfano, DAG inaweza kufanya kazi wakati huo huo katika Tarehe kadhaa za Utekelezaji. Hii inaonyeshwa wazi hapa:
Kwa bahati mbaya (au labda kwa bahati nzuri: inategemea hali hiyo), ikiwa utekelezaji wa kazi katika DAG unarekebishwa, basi utekelezaji katika Tarehe ya Utekelezaji uliopita utaendelea kuzingatia marekebisho. Hii ni nzuri ikiwa unahitaji kuhesabu tena data katika vipindi vya zamani kwa kutumia algorithm mpya, lakini ni mbaya kwa sababu uboreshaji wa matokeo umepotea (kwa kweli, hakuna mtu anayekusumbua kurudisha toleo linalohitajika la nambari ya chanzo kutoka Git na kuhesabu ni nini. unahitaji wakati mmoja, jinsi unavyohitaji).
Kuzalisha kazi
Utekelezaji wa DAG ni msimbo katika Python, kwa hiyo tuna njia rahisi sana ya kupunguza kiasi cha msimbo wakati wa kufanya kazi, kwa mfano, na vyanzo vya sharded. Wacha tuseme una shards tatu za MySQL kama chanzo, unahitaji kupanda ndani ya kila moja na kuchukua data fulani. Aidha, kwa kujitegemea na kwa sambamba. Nambari ya Python katika DAG inaweza kuonekana kama hii:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG inaonekana kama hii:
Katika kesi hii, unaweza kuongeza au kuondoa shard kwa kurekebisha tu mipangilio na uppdatering DAG. Raha!
Unaweza pia kutumia uundaji wa nambari ngumu zaidi, kwa mfano, kufanya kazi na vyanzo katika mfumo wa hifadhidata au kuelezea muundo wa meza, algorithm ya kufanya kazi na meza, na, kwa kuzingatia sifa za miundombinu ya DWH, toa mchakato. kwa kupakia jedwali N kwenye hifadhi yako. Au, kwa mfano, kufanya kazi na API ambayo haiauni kufanya kazi na kigezo katika mfumo wa orodha, unaweza kutoa kazi za N kwenye DAG kutoka kwenye orodha hii, kupunguza ulinganifu wa maombi katika API hadi kwenye dimbwi, na kukwaruza. data muhimu kutoka kwa API. Inabadilika!
hazina
Airflow ina hazina yake ya nyuma, hifadhidata (inaweza kuwa MySQL au Postgres, tuna Postgres), ambayo huhifadhi majimbo ya kazi, DAG, mipangilio ya muunganisho, vigeu vya kimataifa, n.k., n.k. Hapa ningependa niseme kwamba hazina katika Airflow ni rahisi sana (takriban jedwali 20) na ni rahisi ikiwa unataka kuunda michakato yako mwenyewe juu yake. Nakumbuka jedwali 100500 kwenye hazina ya Informatica, ambayo ilibidi isomwe kwa muda mrefu kabla ya kuelewa jinsi ya kuunda swala.
Ufuatiliaji
Kwa kuzingatia unyenyekevu wa hazina, unaweza kuunda mchakato wa ufuatiliaji wa kazi ambao unafaa kwako. Tunatumia daftari huko Zeppelin, ambapo tunaangalia hali ya kazi:
Hii pia inaweza kuwa kiolesura cha wavuti cha Airflow yenyewe:
Msimbo wa Airflow ni chanzo huria, kwa hivyo tumeongeza arifa kwa Telegram. Kila mfano unaoendelea wa kazi, ikiwa hitilafu itatokea, hutuma barua taka kwenye kikundi kwenye Telegramu, ambapo timu nzima ya uendelezaji na usaidizi inajumuisha.
Tunapokea jibu la haraka kupitia Telegramu (ikihitajika), na kupitia Zeppelin tunapokea picha ya jumla ya kazi katika Airflow.
Katika jumla ya
Airflow kimsingi ni chanzo wazi, na hupaswi kutarajia miujiza kutoka kwayo. Kuwa tayari kuweka wakati na juhudi kujenga suluhisho linalofanya kazi. Lengo linaweza kufikiwa, niamini, inafaa. Kasi ya maendeleo, kubadilika, urahisi wa kuongeza michakato mpya - utaipenda. Bila shaka, unahitaji kulipa kipaumbele sana kwa shirika la mradi huo, utulivu wa Airflow yenyewe: miujiza haifanyiki.
Sasa tuna Airflow inafanya kazi kila siku kuhusu kazi elfu 6,5. Wao ni tofauti kabisa katika tabia. Kuna kazi za kupakia data kwenye DWH kuu kutoka kwa vyanzo vingi tofauti na maalum sana, kuna kazi za kuhesabu mbele ya duka ndani ya DWH kuu, kuna kazi za kuchapisha data kwenye DWH ya haraka, kuna kazi nyingi, nyingi tofauti - na Airflow. huwatafuna wote siku baada ya siku. Kuzungumza kwa idadi, hii ni 2,3 elfu Kazi za ELT za ugumu tofauti ndani ya DWH (Hadoop), takriban. hifadhidata mia 2,5 vyanzo, hii ni timu kutoka Watengenezaji 4 wa ETL, ambazo zimegawanywa katika usindikaji wa data wa ETL katika usindikaji wa data wa DWH na ELT ndani ya DWH na bila shaka zaidi msimamizi mmoja, anayeshughulika na miundombinu ya huduma.
Mipango ya siku zijazo
Idadi ya michakato inakua bila kuepukika, na jambo kuu ambalo tutakuwa tukifanya katika suala la miundombinu ya Airflow ni kuongeza. Tunataka kuunda kikundi cha Airflow, kutenga jozi ya miguu kwa wafanyikazi wa Celery, na kutengeneza kichwa cha kujinakili chenye michakato ya kuratibu kazi na hazina.
Epilogue
Hii, kwa kweli, sio kila kitu ambacho ningependa kusema juu ya Airflow, lakini nilijaribu kuangazia mambo kuu. Hamu inakuja na kula, jaribu na utaipenda :)
Chanzo: mapenzi.com