Sveiks, Habr! Å ajÄ rakstÄ es vÄlos runÄt par vienu lielisku rÄ«ku pakeÅ”u datu apstrÄdes procesu izstrÄdei, piemÄram, korporatÄ«vÄ DWH vai jÅ«su DataLake infrastruktÅ«rÄ. MÄs runÄsim par Apache Airflow (turpmÄk tekstÄ Airflow). Tam ir negodÄ«gi atÅemta HabrĆ© uzmanÄ«ba, un galvenajÄ daÄ¼Ä es mÄÄ£inÄÅ”u jÅ«s pÄrliecinÄt, ka vismaz Airflow ir vÄrts aplÅ«kot, izvÄloties plÄnotÄju saviem ETL/ELT procesiem.
IepriekÅ” es rakstÄ«ju rakstu sÄriju par DWH tÄmu, kad strÄdÄju Tinkoff Bank. Tagad esmu kļuvis par daļu no Mail.Ru Group komandas un izstrÄdÄju platformu datu analÄ«zei spÄļu jomÄ. Faktiski, parÄdoties jaunumiem un interesantiem risinÄjumiem, es un mana komanda Å”eit runÄsim par mÅ«su datu analÄ«zes platformu.
Prologs
TÄtad, sÄksim. Kas ir gaisa plÅ«sma? Å Ä« ir bibliotÄka (vai
Tagad apskatÄ«sim galvenÄs Airflow vienÄ«bas. Izprotot to bÅ«tÄ«bu un mÄrÄ·i, jÅ«s varat optimÄli sakÄrtot savu procesu arhitektÅ«ru. IespÄjams, galvenÄ vienÄ«ba ir virzÄ«tais acikliskais grafiks (turpmÄk tekstÄ ā DAG).
DAG
DAG ir jÅ«su uzdevumu semantiska saistÄ«ba, ko vÄlaties izpildÄ«t stingri noteiktÄ secÄ«bÄ saskaÅÄ ar noteiktu grafiku. Airflow nodroÅ”ina Ärtu tÄ«mekļa saskarni darbam ar DAG un citÄm entÄ«tijÄm:
DAG varÄtu izskatÄ«ties Å”Ädi:
IzstrÄdÄtÄjs, izstrÄdÄjot DAG, nosaka operatoru kopumu, uz kuriem tiks balstÄ«ti uzdevumi DAG ietvaros. Å eit mÄs nonÄkam pie citas svarÄ«gas vienÄ«bas: gaisa plÅ«smas operators.
Operatori
Operators ir entÄ«tija, uz kuras pamata tiek izveidotas darba instances, kas apraksta, kas notiks darba instances izpildes laikÄ.
- BashOperator - operators bash komandas izpildei.
- PythonOperator - operators Python koda izsaukŔanai.
- EmailOperator ā operators e-pasta sÅ«tÄ«Å”anai.
- HTTPOperator - operators darbam ar http pieprasījumiem.
- SqlOperator - operators SQL koda izpildei.
- Sensors ir operators notikuma gaidÄ«Å”anai (vajadzÄ«gÄ laika ieraÅ”anÄs, vajadzÄ«gÄ faila parÄdÄ«Å”anÄs, rinda datu bÄzÄ, atbilde no API utt. utt.).
Ir specifiskÄki operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Varat arÄ« izstrÄdÄt operatorus, pamatojoties uz savÄm Ä«paŔībÄm, un izmantot tos savÄ projektÄ. PiemÄram, mÄs izveidojÄm MongoDBToHiveViaHdfsTransfer, operatoru dokumentu eksportÄÅ”anai no MongoDB uz Hive, un vairÄkus operatorus darbam ar
TÄlÄk visi Å”ie uzdevumu gadÄ«jumi ir jÄizpilda, un tagad mÄs runÄsim par plÄnotÄju.
PlÄnotÄjs
Ir izveidots Airflow uzdevumu plÄnotÄjs
Katram baseinam ir laika niŔu skaita ierobežojums. Izveidojot DAG, tam tiek pieŔķirts baseins:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
DAG lÄ«menÄ« definÄtu pÅ«lu var ignorÄt uzdevuma lÄ«menÄ«.
AtseviŔķs process PlÄnotÄjs ir atbildÄ«gs par visu Airflow uzdevumu plÄnoÅ”anu. Faktiski plÄnotÄjs nodarbojas ar visu izpildes uzdevumu iestatÄ«Å”anas mehÄniku. Pirms izpildes uzdevums iziet vairÄkus posmus:
- IepriekÅ”Äjie uzdevumi DAG ir izpildÄ«ti, var likt jaunu.
- Rinda tiek sakÄrtota atkarÄ«bÄ no uzdevumu prioritÄtes (var arÄ« kontrolÄt prioritÄtes), un, ja baseinÄ ir brÄ«va vieta, uzdevumu var nodot ekspluatÄcijÄ.
- Ja ir brÄ«vs strÄdnieks selerija, uzdevums tiek nosÅ«tÄ«ts tai; sÄkas darbs, ko ieprogrammÄjÄt problÄmÄ, izmantojot vienu vai otru operatoru.
Pietiekami vienkÄrÅ”i.
PlÄnotÄjs darbojas uz visu DAG kopu un visiem uzdevumiem DAG ietvaros.
Lai plÄnotÄjs varÄtu sÄkt strÄdÄt ar DAG, DAG ir jÄiestata grafiks:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Ir gatavu iestatījumu komplekts: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Varat arī izmantot cron izteiksmes:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Izpildes datums
Lai saprastu, kÄ darbojas Airflow, ir svarÄ«gi saprast, kas ir DAG izpildes datums. ProgrammÄ Airflow DAG ir izpildes datuma dimensija, t.i., atkarÄ«bÄ no DAG darba grafika katram izpildes datumam tiek izveidoti uzdevumu gadÄ«jumi. Un katram izpildes datumam uzdevumus var izpildÄ«t atkÄrtoti - vai, piemÄram, DAG var strÄdÄt vienlaikus vairÄkos izpildes datumos. Tas ir skaidri parÄdÄ«ts Å”eit:
DiemžÄl (vai varbÅ«t par laimi: tas ir atkarÄ«gs no situÄcijas), ja uzdevuma izpilde DAG tiks labota, tad izpilde iepriekÅ”ÄjÄ Izpildes datumÄ turpinÄsies, Åemot vÄrÄ korekcijas. Tas ir labi, ja ir jÄpÄrrÄÄ·ina dati par pagÄjuÅ”ajiem periodiem, izmantojot jaunu algoritmu, bet tas ir slikti, jo tiek zaudÄta rezultÄta reproducÄjamÄ«ba (protams, neviens netraucÄ atgriezt nepiecieÅ”amo avota koda versiju no Git un aprÄÄ·inÄt, kas jums vajag vienu reizi, tÄ, kÄ jums tas ir nepiecieÅ”ams).
Uzdevumu Ä£enerÄÅ”ana
DAG ievieÅ”ana ir kods Python, tÄpÄc mums ir ļoti Ärts veids, kÄ samazinÄt koda daudzumu, strÄdÄjot, piemÄram, ar Ŕķeltajiem avotiem. PieÅemsim, ka jums ir trÄ«s MySQL shards kÄ avots, jums jÄiekļaujas katrÄ un jÄiegÅ«st daži dati. TurklÄt neatkarÄ«gi un paralÄli. Python kods DAG var izskatÄ«ties Å”Ädi:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG izskatÄs Å”Ädi:
Å ajÄ gadÄ«jumÄ jÅ«s varat pievienot vai noÅemt Ŕķembu, vienkÄrÅ”i pielÄgojot iestatÄ«jumus un atjauninot DAG. Ärti!
Varat arÄ« izmantot sarežģītÄku koda Ä£enerÄÅ”anu, piemÄram, strÄdÄt ar avotiem datu bÄzes veidÄ vai aprakstÄ«t tabulas struktÅ«ru, algoritmu darbam ar tabulu un, Åemot vÄrÄ DWH infrastruktÅ«ras iezÄ«mes, Ä£enerÄt procesu. lai ielÄdÄtu N tabulu savÄ krÄtuvÄ. Vai, piemÄram, strÄdÄjot ar API, kas neatbalsta darbu ar parametru saraksta veidÄ, varat Ä£enerÄt N uzdevumus DAG no Ŕī saraksta, ierobežot API pieprasÄ«jumu paralÄlismu lÄ«dz pÅ«lam un nokasÄ«t nepiecieÅ”amie dati no API. ElastÄ«gi!
krÄtuve
Airflow ir savs aizmugures repozitorijs, datu bÄze (var bÅ«t MySQL vai Postgres, mums ir Postgres), kurÄ tiek glabÄti uzdevumu stÄvokļi, DAG, savienojuma iestatÄ«jumi, globÄlie mainÄ«gie utt., utt. Å eit es gribÄtu teikt, ka Repozitorijs programmÄ Airflow ir ļoti vienkÄrÅ”s (apmÄram 20 tabulas) un Ärts, ja vÄlaties tai papildinÄt kÄdu no saviem procesiem. Atceros 100500 XNUMX tabulas Informatica repozitorijÄ, kuras bija ilgi jÄpÄta, pirms sapratu, kÄ veidot vaicÄjumu.
Uzraudzība
Å emot vÄrÄ repozitorija vienkÄrŔību, varat izveidot sev Ärtu uzdevumu uzraudzÄ«bas procesu. MÄs Zeppelin izmantojam piezÄ«mju grÄmatiÅu, kurÄ aplÅ«kojam uzdevumu statusu:
TÄ varÄtu bÅ«t arÄ« paÅ”as Airflow tÄ«mekļa saskarne:
Airflow kods ir atvÄrtÄ koda, tÄpÄc mÄs esam pievienojuÅ”i brÄ«dinÄjumus Telegram. Katrs palaist uzdevuma gadÄ«jums, ja rodas kļūda, izsÅ«ta surogÄtpastu Telegram grupai, kurÄ ir visa izstrÄdes un atbalsta komanda.
MÄs saÅemam tÅ«lÄ«tÄju atbildi, izmantojot Telegram (ja nepiecieÅ”ams), un caur Zeppelin mÄs saÅemam vispÄrÄju priekÅ”statu par uzdevumiem Airflow.
KopÄ
Gaisa plÅ«sma galvenokÄrt ir atvÄrtÄ koda avots, un no tÄ nevajadzÄtu gaidÄ«t brÄ«numus. Esiet gatavs veltÄ«t laiku un pÅ«les, lai izveidotu risinÄjumu, kas darbojas. MÄrÄ·is ir sasniedzams, ticiet man, tas ir tÄ vÄrts. AttÄ«stÄ«bas Ätrums, elastÄ«ba, jaunu procesu pievienoÅ”anas vieglums - jums tas patiks. Protams, liela uzmanÄ«ba jÄpievÄrÅ” projekta organizÄcijai, paÅ”as Airflow stabilitÄtei: brÄ«numi nenotiek.
Tagad Airflow strÄdÄ katru dienu apmÄram 6,5 tÅ«kstoÅ”i uzdevumu. Tie ir diezgan atŔķirÄ«gi pÄc rakstura. Ir uzdevumi ielÄdÄt datus galvenajÄ DWH no daudziem dažÄdiem un ļoti specifiskiem avotiem, ir uzdevumi, lai aprÄÄ·inÄtu veikalu fasÄdes galvenajÄ DWH, ir uzdevumi publicÄt datus ÄtrÄ DWH, ir daudz, daudz dažÄdu uzdevumu - un Airflow. koÅ”Ä¼Ä tos visus dienu no dienas. RunÄjot skaitļos, tas ir 2,3 tÅ«kstoÅ”i DažÄdas sarežģītÄ«bas ELT uzdevumi DWH (Hadoop) ietvaros, apm. 2,5 simti datu bÄzu avotiem, Ŕī ir komanda no 4 ETL izstrÄdÄtÄji, kas ir sadalÄ«ti ETL datu apstrÄdÄ DWH un ELT datu apstrÄdÄ DWH iekÅ”ienÄ un, protams, vairÄk viens admins, kas nodarbojas ar pakalpojuma infrastruktÅ«ru.
PlÄni nÄkotnei
Procesu skaits neizbÄgami pieaug, un galvenais, ko mÄs darÄ«sim saistÄ«bÄ ar Airflow infrastruktÅ«ru, ir mÄrogoÅ”ana. MÄs vÄlamies izveidot Airflow klasteru, pieŔķirt Selery strÄdniekiem kÄju pÄri un izveidot paÅ”dublÄjoÅ”u galvu ar darba plÄnoÅ”anas procesiem un repozitoriju.
Epilogs
Tas, protams, nav viss, ko es vÄlÄtos pastÄstÄ«t par Airflow, bet es centos izcelt galvenos punktus. ApetÄ«te nÄk Ädot, pamÄÄ£ini un tev patiks :)
Avots: www.habr.com