Bawo, Emi ni Dmitry Logvinenko - Onimọ-ẹrọ data ti Ẹka atupale ti ẹgbẹ Vezet ti awọn ile-iṣẹ.
Emi yoo sọ fun ọ nipa ohun elo iyalẹnu kan fun idagbasoke awọn ilana ETL - Apache Airflow. Ṣugbọn Airflow jẹ wapọ ati multifaceted ti o yẹ ki o wo ni pẹkipẹki paapaa ti o ko ba ni ipa ninu awọn ṣiṣan data, ṣugbọn ni iwulo lati ṣe ifilọlẹ eyikeyi awọn ilana lorekore ati ṣe atẹle ipaniyan wọn.
Ati bẹẹni, Emi kii yoo sọ nikan, ṣugbọn tun fihan: eto naa ni ọpọlọpọ awọn koodu, awọn sikirinisoti ati awọn iṣeduro.
Ohun ti o maa n rii nigbati o ba google ọrọ Airflow / Wikimedia Commons
- dara nikan, ati ṣe fun awọn idi ti o yatọ patapata, eyun (gẹgẹbi a ti kọ ṣaaju kita):
nṣiṣẹ ati abojuto awọn iṣẹ ṣiṣe lori nọmba ailopin ti awọn ẹrọ (bii ọpọlọpọ Seleri / Kubernetes ati ẹri-ọkan rẹ yoo gba ọ laaye)
pẹlu iran iṣan-iṣẹ agbara lati rọrun pupọ lati kọ ati loye koodu Python
ati agbara lati sopọ eyikeyi awọn apoti isura infomesonu ati awọn API pẹlu ara wọn nipa lilo awọn paati ti a ti ṣetan ati awọn afikun ile (eyiti o rọrun pupọju).
A lo Apache Airflow bii eyi:
a gba data lati orisirisi awọn orisun (ọpọlọpọ awọn SQL Server ati PostgreSQL apeere, orisirisi APIs pẹlu ohun elo metiriki, ani 1C) ni DWH ati ODS (a ni Vertica ati Clickhouse).
bi o ti ni ilọsiwaju cron, eyiti o bẹrẹ awọn ilana isọdọkan data lori ODS, ati tun ṣe abojuto itọju wọn.
Titi di aipẹ, awọn iwulo wa ni aabo nipasẹ olupin kekere kan pẹlu awọn ohun kohun 32 ati 50 GB ti Ramu. Ni Airflow, eyi ṣiṣẹ:
siwaju sii 200 agba (Nitootọ ṣiṣan iṣẹ, ninu eyiti a ṣe awọn iṣẹ ṣiṣe),
ni kọọkan lori apapọ 70 awọn iṣẹ-ṣiṣe,
Oore yii bẹrẹ (tun ni apapọ) lẹẹkan wakati kan.
Emi yoo kọ nipa bii a ṣe gbooro si isalẹ, ṣugbọn nisisiyi jẹ ki a ṣalaye iṣẹ-ṣiṣe über ti a yoo yanju:
Awọn olupin SQL orisun mẹta wa, ọkọọkan pẹlu awọn apoti isura infomesonu 50 - awọn apẹẹrẹ ti iṣẹ akanṣe kan, ni atele, wọn ni eto kanna (fere nibikibi, mua-ha-ha), eyiti o tumọ si pe ọkọọkan ni tabili Awọn aṣẹ (O daa, tabili pẹlu iyẹn). orukọ le jẹ Titari sinu eyikeyi iṣowo). A gba data naa nipa fifi awọn aaye iṣẹ kun (olupin orisun, ibi ipamọ data orisun, ID iṣẹ-ṣiṣe ETL) ati ni irọra sọ wọn sinu, sọ, Vertica.
Lọ!
Apa akọkọ, ilowo (ati imọ-jinlẹ diẹ)
Kini idi ti awa (ati iwọ)
Nigbati awọn igi tobi ati pe Mo rọrun SQL-schik ninu soobu ara ilu Rọsia kan, a scammed awọn ilana ETL aka ṣiṣan data nipa lilo awọn irinṣẹ meji ti o wa fun wa:
Ile-iṣẹ Agbara Informatica - eto ti ntan kaakiri, iṣelọpọ pupọ, pẹlu ohun elo tirẹ, ẹya tirẹ. Bi o ṣe fẹ, Mo lo 1% ti awọn agbara rẹ. Kí nìdí? O dara, ni akọkọ, wiwo yii fi titẹ ẹmi si wa ni ibikan sẹhin ni awọn ọdun 380. Ni ẹẹkeji, ilodisi yii jẹ apẹrẹ fun awọn ilana ti o wuyi pupọ, atunlo paati ibinu ati awọn ẹtan ile-iṣẹ to ṣe pataki pupọ. A kii yoo sọ ohunkohun nipa otitọ pe o jẹ iye to bi apakan Airbus AXNUMX fun ọdun kan.
Ṣọra, sikirinifoto le ṣe ipalara fun awọn eniyan labẹ ọdun 30 diẹ
SQL Server Integration Server - a lo ẹlẹgbẹ yii ninu awọn ṣiṣan inu-iṣẹ wa. O dara, ni otitọ: a ti lo SQL Server tẹlẹ, ati pe yoo jẹ aimọgbọnwa bakanna lati ma lo awọn irinṣẹ ETL rẹ. Ohun gbogbo ti o wa ninu rẹ dara: mejeeji ni wiwo jẹ lẹwa, ati awọn ijabọ ilọsiwaju ... Ṣugbọn eyi kii ṣe idi ti a fi fẹran awọn ọja sọfitiwia, oh, kii ṣe fun eyi. Ẹya rẹ dtsx (eyiti o jẹ XML pẹlu awọn apa ti o dapọ nigbati o ba fipamọ) a le, ṣugbọn kini aaye naa? Bawo ni nipa ṣiṣe akojọpọ awọn iṣẹ-ṣiṣe ti yoo fa awọn tabili ọgọrun lati ọdọ olupin kan si ekeji? Kilode, ọgọrun, ogun ninu wọn yoo jẹ ki ika ika rẹ ṣubu lakoko titẹ bọtini Asin naa. Ṣugbọn dajudaju o dabi asiko diẹ sii:
Dajudaju a wa awọn ọna jade. Ọran paapaa fere wa si olupilẹṣẹ package SSIS ti ara ẹni…
… ati lẹhinna iṣẹ tuntun kan rii mi. Apache Airflow si bori mi lori rẹ.
Nigbati mo rii pe awọn apejuwe ilana ETL jẹ koodu Python rọrun, Mo kan ko jo fun ayọ. Eyi ni bii awọn ṣiṣan data ṣe jẹ ẹya ati iyatọ, ati sisọ awọn tabili pẹlu eto kan lati awọn ọgọọgọrun awọn data data sinu ibi-afẹde kan di ọrọ ti koodu Python ni ọkan ati idaji tabi meji awọn iboju 13 ”.
Nto iṣupọ
Jẹ ki a ko ṣeto ile-ẹkọ jẹle-osinmi patapata, ki a ma sọrọ nipa awọn nkan ti o han gbangba nibi, bii fifi sori ẹrọ Airflow, ibi ipamọ data ti o yan, Seleri ati awọn ọran miiran ti a ṣalaye ninu awọn docks.
Ki a le lẹsẹkẹsẹ bẹrẹ awọn adanwo, Mo sketched docker-compose.yml ninu eyiti:
Jẹ ki a gbe ga gaan Fife ategun: Iṣeto, Webserver. Flower yoo tun ti wa ni nyi nibẹ lati bojuto awọn Seleri awọn iṣẹ-ṣiṣe (nitori o ti tẹlẹ ti ti sinu apache/airflow:1.10.10-python3.7ṣugbọn a ko bikita)
PostgreSQL, Ninu eyiti Airflow yoo kọ alaye iṣẹ rẹ (data oluṣeto, awọn iṣiro ipaniyan, ati bẹbẹ lọ), ati Celery yoo samisi awọn iṣẹ ṣiṣe ti pari;
Redis, eyi ti yoo ṣiṣẹ bi alagbata iṣẹ-ṣiṣe fun Seleri;
Osise seleri, eyi ti yoo wa ni olukoni ni taara ipaniyan ti awọn iṣẹ-ṣiṣe.
Si folda ./dags A yoo ṣe akojọpọ awọn faili wa pẹlu awọn apejuwe ti awọn ika. Wọn yoo gbe soke lori fo, nitorina ko si iwulo lati gbe gbogbo akopọ lẹhin igbati kọọkan.
Ni diẹ ninu awọn aaye, koodu ti o wa ninu awọn apẹẹrẹ ko han patapata (ki o má ba ṣe idamu ọrọ naa), ṣugbọn ibikan ni o ṣe atunṣe ninu ilana naa. Awọn apẹẹrẹ koodu iṣẹ pipe ni a le rii ni ibi ipamọ https://github.com/dm-logv/airflow-tutorial.
Ninu apejọ ti akopọ, Mo gbarale pupọ lori aworan ti a mọ daradara puckel / docker-afẹfẹ - rii daju lati ṣayẹwo. Boya o ko nilo ohunkohun miiran ninu aye re.
Gbogbo Airflow eto wa ko nipasẹ nikan airflow.cfg, sugbon tun nipasẹ ayika oniyipada (o ṣeun si awọn Difelopa), eyi ti mo ti irira lo anfani ti.
Nipa ti, kii ṣe iṣelọpọ-ṣetan: Emi ko mọọmọ ko fi awọn aiya sinu awọn apoti, Emi ko ni wahala pẹlu aabo. Sugbon mo ti ṣe awọn kere dara fun wa experimenters.
Ṣe akiyesi pe:
Apoti dag gbọdọ wa ni iraye si awọn oluṣeto ati awọn oṣiṣẹ.
Kanna kan si gbogbo awọn ile-ikawe ẹni-kẹta - gbogbo wọn gbọdọ fi sori ẹrọ lori awọn ẹrọ pẹlu oluṣeto ati awọn oṣiṣẹ.
O dara, bayi o rọrun:
$ docker-compose up --scale worker=3
Lẹhin ohun gbogbo ti dide, o le wo awọn atọkun wẹẹbu:
Ti o ko ba loye ohunkohun ninu gbogbo awọn “dags” wọnyi, lẹhinna eyi ni iwe-itumọ kukuru kan:
scheduler - aburo pataki julọ ni Airflow, ẹniti o ṣakoso pe awọn roboti ṣiṣẹ takuntakun, kii ṣe eniyan: ṣe abojuto iṣeto, awọn imudojuiwọn dags, awọn ifilọlẹ awọn iṣẹ ṣiṣe.
Ni gbogbogbo, ni awọn ẹya agbalagba, o ni awọn iṣoro pẹlu iranti (rara, kii ṣe amnesia, ṣugbọn awọn n jo) ati paramita ohun-ini paapaa wa ninu awọn atunto. run_duration - awọn oniwe-tun aarin. Ṣugbọn nisisiyi ohun gbogbo dara.
DAG (aka "dag") - "aworan acyclic ti o darí", ṣugbọn iru itumọ bẹẹ yoo sọ fun eniyan diẹ, ṣugbọn ni otitọ o jẹ eiyan fun awọn iṣẹ ṣiṣe ti o nlo pẹlu ara wọn (wo isalẹ) tabi afọwọṣe ti Package ni SSIS ati Ise-iṣẹ ni Informatica .
Ni afikun si awọn dags, nibẹ ni o le tun jẹ sabdags, sugbon a julọ seese yoo ko gba si wọn.
DAG Run - initialized dag, eyi ti o ti wa ni sọtọ awọn oniwe-ara execution_date. Dagrans ti ọkan dag le ṣiṣẹ ni afiwe (ti o ba jẹ pe, dajudaju, o ti jẹ ki awọn iṣẹ ṣiṣe rẹ lagbara).
onišẹ jẹ awọn ege koodu ti o ni iduro fun ṣiṣe iṣe kan pato. Awọn oriṣi mẹta ti awọn oniṣẹ wa:
igbesebi ayanfẹ wa PythonOperator, eyi ti o le ṣiṣẹ eyikeyi (wulo) Python koodu;
gbigbe, eyiti o gbe data lati ibi de ibi, sọ pe, MsSqlToHiveTransfer;
sensọ ni apa keji, yoo gba ọ laaye lati fesi tabi fa fifalẹ ipaniyan siwaju sii ti dag titi iṣẹlẹ yoo fi waye. HttpSensor le fa awọn pàtó kan endpoint, ati nigbati awọn ti o fẹ esi ti wa ni nduro, bẹrẹ awọn gbigbe GoogleCloudStorageToS3Operator. Okan oniwadii yoo beere pe: “Kilode? Lẹhinna, o le ṣe awọn atunwi taara ninu oniṣẹ ẹrọ!” Ati lẹhinna, ni ibere ki o má ba di adagun awọn iṣẹ-ṣiṣe pẹlu awọn oniṣẹ ti daduro. Sensọ bẹrẹ, ṣayẹwo ati ku ṣaaju igbiyanju atẹle.
Išẹ - Awọn oniṣẹ ti a kede, laibikita iru, ati ti a so mọ dag naa ni igbega si ipo iṣẹ-ṣiṣe.
apẹẹrẹ iṣẹ-ṣiṣe - nigbati oluṣeto gbogbogbo pinnu pe o to akoko lati fi awọn iṣẹ-ṣiṣe ranṣẹ si ogun lori awọn oṣere-iṣẹ (ọtun ni aaye, ti a ba lo. LocalExecutor tabi lati kan latọna ipade ni irú ti CeleryExecutor), o fi aaye kan fun wọn (ie, eto awọn oniyipada - awọn aye ipaniyan), faagun aṣẹ tabi awọn awoṣe ibeere, ati awọn adagun-omi wọn.
A ṣe ipilẹṣẹ awọn iṣẹ-ṣiṣe
Ni akọkọ, jẹ ki a ṣe ilana ilana gbogbogbo ti aja wa, ati lẹhinna a yoo lọ sinu awọn alaye siwaju ati siwaju sii, nitori a lo diẹ ninu awọn solusan ti kii ṣe bintin.
Nitorinaa, ni ọna ti o rọrun julọ, iru dag kan yoo dabi eyi:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Jẹ ki a ro ero rẹ:
Ni akọkọ, a gbe wọle awọn libs pataki ati nkan miran;
sql_server_ds Ṣe List[namedtuple[str, str]] pẹlu awọn orukọ ti awọn asopọ lati Airflow Connections ati awọn database lati eyi ti a yoo gba wa awo;
dag - ikede dag wa, eyiti o gbọdọ wa ni dandan globals(), bibẹkọ ti Airflow yoo ko ri o. Doug tun nilo lati sọ:
Kini oruko re orders - orukọ yii yoo han lẹhinna ni wiwo wẹẹbu,
pé yóò ṣiṣẹ́ láti ọ̀gànjọ́ òru ọjọ́ kẹjọ, oṣù keje,
ati pe o yẹ ki o ṣe ifilọlẹ isunmọ ni gbogbo awọn wakati 6 (fun awọn eniyan tutu, nibi dipo timedelta() iyọọda cron-ila 0 0 0/6 ? * * *, fun awọn kere itura - ẹya ikosile bi @daily);
workflow() yoo ṣe akọkọ ise, sugbon ko bayi. Ni bayi, a kan yoo da ọrọ-ọrọ wa silẹ sinu akọọlẹ naa.
Ati nisisiyi idan ti o rọrun ti ṣiṣẹda awọn iṣẹ-ṣiṣe:
a ṣiṣe nipasẹ awọn orisun wa;
ipilẹṣẹ PythonOperator, eyi ti yoo ṣiṣẹ dummy wa workflow(). Maṣe gbagbe lati pato orukọ alailẹgbẹ (laarin dag) ti iṣẹ-ṣiṣe ki o di dag naa funrararẹ. Flag provide_context ni titan, yoo tú awọn ariyanjiyan afikun sinu iṣẹ naa, eyiti a yoo farabalẹ gba ni lilo **context.
Fun bayi, iyẹn ni gbogbo. Ohun ti a ni:
dag tuntun ni wiwo wẹẹbu,
awọn iṣẹ-ṣiṣe kan ati idaji ti yoo ṣe ni afiwe (ti Airflow, Seleri eto ati agbara olupin gba laaye).
O dara, o fẹrẹ gba.
Tani yoo fi sori ẹrọ awọn igbẹkẹle naa?
Lati rọrun gbogbo nkan yii, Mo wọ inu docker-compose.yml processing requirements.txt lori gbogbo apa.
A duro diẹ, awọn iṣẹ-ṣiṣe ti wa ni imudani nipasẹ awọn oṣiṣẹ:
Awọn alawọ ewe, dajudaju, ti pari iṣẹ wọn ni ifijišẹ. Reds ko ni aṣeyọri pupọ.
Nipa ọna, ko si folda lori ọja wa ./dags, ko si amuṣiṣẹpọ laarin awọn ẹrọ - gbogbo awọn dags dubulẹ ni git lori Gitlab wa, ati Gitlab CI n pin awọn imudojuiwọn si awọn ẹrọ nigbati o ba dapọ sinu master.
Diẹ nipa Flower
Lakoko ti awọn oṣiṣẹ n pa awọn pacifiers wa, jẹ ki a ranti irinṣẹ miiran ti o le fi nkan han wa - Flower.
Oju-iwe akọkọ pupọ pẹlu alaye akojọpọ lori awọn apa oṣiṣẹ:
Oju-iwe ti o lagbara julọ pẹlu awọn iṣẹ ṣiṣe ti o lọ si iṣẹ:
Oju-iwe alaidun julọ pẹlu ipo ti alagbata wa:
Oju-iwe didan julọ wa pẹlu awọn aworan ipo iṣẹ-ṣiṣe ati akoko ipaniyan wọn:
A fifuye awọn underloaded
Nitorinaa, gbogbo awọn iṣẹ ṣiṣe ti ṣiṣẹ, o le gbe awọn ti o gbọgbẹ lọ.
Ati ọpọlọpọ awọn ti o gbọgbẹ - fun idi kan tabi omiiran. Ninu ọran ti lilo deede ti Airflow, awọn onigun mẹrin wọnyi tọka pe pato data ko de.
O nilo lati wo akọọlẹ naa ki o tun bẹrẹ awọn iṣẹlẹ iṣẹ-ṣiṣe ti o ṣubu.
Nipa tite lori eyikeyi onigun mẹrin, a yoo rii awọn iṣe ti o wa fun wa:
O le mu ki o ṣe Ko awọn ti o ṣubu kuro. Iyẹn ni, a gbagbe pe nkan kan ti kuna nibẹ, ati pe iṣẹ-ṣiṣe apẹẹrẹ kanna yoo lọ si oluṣeto.
O han gbangba pe ṣiṣe eyi pẹlu Asin pẹlu gbogbo awọn onigun mẹrin pupa kii ṣe eniyan pupọ - eyi kii ṣe ohun ti a nireti lati Airflow. Nipa ti ara, a ni awọn ohun ija iparun: Browse/Task Instances
Jẹ ki a yan ohun gbogbo ni ẹẹkan ki a tunto si odo, tẹ ohun kan ti o pe:
Lẹhin mimọ, awọn takisi wa dabi eyi (wọn ti nduro tẹlẹ fun oluṣeto lati ṣeto wọn):
Awọn asopọ, awọn iwọ ati awọn oniyipada miiran
O to akoko lati wo DAG atẹle, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Njẹ gbogbo eniyan ti ṣe imudojuiwọn ijabọ kan? Eyi ni lẹẹkansi: atokọ ti awọn orisun wa lati ibiti o ti gba data naa; nibẹ ni a akojọ ibi ti lati fi; maṣe gbagbe lati honk nigbati ohun gbogbo ṣẹlẹ tabi fọ (daradara, eyi kii ṣe nipa wa, rara).
Jẹ ki a lọ nipasẹ faili naa lẹẹkansi ki o wo nkan tuntun ti o ṣofo:
from commons.operators import TelegramBotSendMessage - Ko si ohun ti o ṣe idiwọ fun wa lati ṣe awọn oniṣẹ ti ara wa, eyiti a lo anfani nipasẹ ṣiṣe apẹja kekere kan fun fifiranṣẹ awọn ifiranṣẹ si Ṣii silẹ. (A yoo sọrọ diẹ sii nipa oniṣẹ ẹrọ yii ni isalẹ);
default_args={} - dag le pin kaakiri awọn ariyanjiyan kanna si gbogbo awọn oniṣẹ rẹ;
to='{{ var.value.all_the_kings_men }}' - aaye to a kii yoo ni koodu lile, ṣugbọn ti ipilẹṣẹ ni agbara ni lilo Jinja ati oniyipada kan pẹlu atokọ ti awọn imeeli, eyiti Mo farabalẹ fi sii Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - ipo fun ibẹrẹ oniṣẹ. Ninu ọran wa, lẹta naa yoo fo si awọn ọga nikan ti gbogbo awọn igbẹkẹle ba ti ṣiṣẹ ni ifijišẹ;
tg_bot_conn_id='tg_main' - awọn ariyanjiyan conn_id gba awọn ID asopọ ti a ṣẹda ninu Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - awọn ifiranṣẹ ni Telegram yoo fo kuro nikan ti awọn iṣẹ ṣiṣe ti o ṣubu ba wa;
task_concurrency=1 - a ṣe idiwọ ifilọlẹ igbakana ti ọpọlọpọ awọn iṣẹlẹ iṣẹ-ṣiṣe ti iṣẹ-ṣiṣe kan. Bibẹẹkọ, a yoo gba ọpọlọpọ awọn ifilọlẹ nigbakanna VerticaOperator (nwa ni ọkan tabili);
report_update >> [email, tg] - gbogbo VerticaOperator darapọ ni fifiranṣẹ awọn lẹta ati awọn ifiranṣẹ, bii eyi:
Ṣugbọn niwọn igba ti awọn oniṣẹ notifier ni awọn ipo ifilọlẹ oriṣiriṣi, ọkan nikan yoo ṣiṣẹ. Ninu Wiwo Igi, ohun gbogbo dabi wiwo diẹ diẹ:
Emi yoo sọ ọrọ diẹ nipa Makiro ati awọn ọrẹ wọn - oniyipada.
Macros jẹ awọn oniduro Jinja ti o le paarọ ọpọlọpọ alaye to wulo sinu awọn ariyanjiyan oniṣẹ. Fun apẹẹrẹ, bii eyi:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} yoo faagun si awọn akoonu ti awọn oniyipada ọrọ execution_date ni ọna kika YYYY-MM-DD: 2020-07-14. Apakan ti o dara julọ ni pe awọn oniyipada ọrọ-ọrọ ni a kan mọ si apẹẹrẹ iṣẹ-ṣiṣe kan pato (square kan ninu Iwo Igi), ati pe nigba ti a tun bẹrẹ, awọn aaye yoo faagun si awọn iye kanna.
Awọn iye ti a sọtọ ni a le wo ni lilo bọtini Ti a ṣe lori apẹẹrẹ iṣẹ-ṣiṣe kọọkan. Eyi ni bii iṣẹ ṣiṣe pẹlu fifiranṣẹ lẹta kan:
Ati bẹ ni iṣẹ-ṣiṣe pẹlu fifiranṣẹ ifiranṣẹ kan:
Atokọ pipe ti awọn macros ti a ṣe sinu fun ẹya tuntun ti o wa nibi: Macros Reference
Pẹlupẹlu, pẹlu iranlọwọ ti awọn afikun, a le kede awọn macros tiwa, ṣugbọn iyẹn jẹ itan miiran.
Ni afikun si awọn ohun ti a ti sọ tẹlẹ, a le paarọ awọn iye ti awọn oniyipada wa (Mo ti lo eyi tẹlẹ ninu koodu loke). Jẹ ki a ṣẹda Admin/Variables nkan meji:
kan lo ọna si bọtini ti o fẹ: {{ var.json.bot_config.bot.token }}.
Emi yoo sọ ọrọ kan gangan ati ṣafihan sikirinifoto kan nipa awọn isopọ. Ohun gbogbo jẹ alakọbẹrẹ nibi: lori oju-iwe naa Admin/Connections a ṣẹda asopọ kan, ṣafikun awọn iwọle / awọn ọrọ igbaniwọle wa ati awọn aye pataki diẹ sii nibẹ. Bi eleyi:
Awọn ọrọ igbaniwọle le jẹ ti paroko (diẹ sii daradara ju aiyipada lọ), tabi o le fi iru asopọ silẹ (bii MO ṣe fun tg_main) - otitọ ni pe atokọ ti awọn oriṣi jẹ lile sinu awọn awoṣe Airflow ati pe ko le ṣe afikun laisi titẹ sinu koodu orisun (ti o ba lojiji Emi ko Google nkankan, jọwọ ṣe atunṣe mi), ṣugbọn ko si ohun ti yoo da wa duro lati gba awọn kirẹditi ni irọrun nipasẹ oruko.
O tun le ṣe awọn asopọ pupọ pẹlu orukọ kanna: ninu ọran yii, ọna naa BaseHook.get_connection(), eyi ti o gba wa awọn isopọ nipa orukọ, yoo fun laileto lati awọn orukọ pupọ (yoo jẹ ọgbọn diẹ sii lati ṣe Round Robin, ṣugbọn jẹ ki a fi silẹ lori ẹri-ọkàn ti awọn olupilẹṣẹ Airflow).
Awọn iyipada ati Awọn isopọ jẹ awọn irinṣẹ itura dajudaju, ṣugbọn o ṣe pataki lati ma padanu iwọntunwọnsi: awọn apakan ti ṣiṣan rẹ ti o fipamọ sinu koodu funrararẹ, ati awọn apakan wo ni o fun Airflow fun ibi ipamọ. Ni ọna kan, o le rọrun lati yi iye pada ni kiakia, fun apẹẹrẹ, apoti ifiweranṣẹ, nipasẹ UI. Ni apa keji, eyi tun jẹ ipadabọ si tẹ Asin, lati eyiti a (I) fẹ lati yọ kuro.
Ṣiṣẹ pẹlu awọn asopọ jẹ ọkan ninu awọn iṣẹ-ṣiṣe ìkọ. Ni gbogbogbo, awọn kio Airflow jẹ awọn aaye fun sisopọ rẹ si awọn iṣẹ ẹnikẹta ati awọn ile-ikawe. Fun apẹẹrẹ, JiraHook yoo ṣii onibara fun a nlo pẹlu Jira (o le gbe awọn iṣẹ-ṣiṣe pada ati siwaju), ati pẹlu iranlọwọ ti awọn SambaHook o le Titari faili agbegbe kan si smb-ojuami.
Ṣiṣayẹwo oniṣẹ ẹrọ aṣa
Ati pe a sunmọ lati wo bi o ti ṣe TelegramBotSendMessage
Koodu commons/operators.py pẹlu oniṣẹ gangan:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Nibi, bii ohun gbogbo miiran ni Airflow, ohun gbogbo rọrun pupọ:
Jogun lati BaseOperator, eyiti o ṣe awọn ohun kan pato Airflow diẹ (wo akoko isinmi rẹ)
Awọn aaye ti a kede template_fields, ninu eyiti Jinja yoo wa awọn macros lati ṣe ilana.
Seto awọn ọtun ariyanjiyan fun __init__(), ṣeto awọn aiyipada ibi ti pataki.
A ko gbagbe nipa ibẹrẹ ti baba naa boya.
Ṣii kio ti o baamu TelegramBotHook, gba ohun onibara lati ọdọ rẹ.
Yipadanu (ti a tunṣe) ọna BaseOperator.execute(), eyiti Airfow yoo tẹẹrẹ nigbati akoko ba de lati ṣe ifilọlẹ oniṣẹ - ninu rẹ a yoo ṣe iṣe iṣe akọkọ, gbagbe lati wọle. (A wọle, nipasẹ ọna, wọle stdout и stderr - Ṣiṣan afẹfẹ yoo ṣe idiwọ ohun gbogbo, fi ipari si ni ẹwa, bajẹ rẹ nibiti o jẹ dandan.)
Jẹ ká wo ohun ti a ni ninu commons/hooks.py. Apa akọkọ ti faili naa, pẹlu kio funrararẹ:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Emi ko paapaa mọ kini lati ṣalaye nibi, Emi yoo kan ṣe akiyesi awọn aaye pataki:
A jogun, ronu nipa awọn ariyanjiyan - ni ọpọlọpọ awọn ọran yoo jẹ ọkan: conn_id;
Yiyọ awọn ọna boṣewa: Mo ni opin ara mi get_conn(), ninu eyiti Mo gba awọn paramita asopọ nipasẹ orukọ ati pe o kan gba apakan naa extra (Eyi jẹ aaye JSON), ninu eyiti MO (gẹgẹbi awọn ilana ti ara mi!) Fi ami ami bot Telegram naa: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Mo ṣẹda apẹẹrẹ ti wa TelegramBot, fifun ni pato aami.
Gbogbo ẹ niyẹn. O le gba alabara kan lati inu kio kan nipa lilo TelegramBotHook().clent tabi TelegramBotHook().get_conn().
Ati apakan keji ti faili naa, ninu eyiti Mo ṣe microwrapper fun Telegram REST API, nitorinaa ki o ma ṣe fa kanna python-telegram-bot nitori ọna kan sendMessage.
Ọna ti o tọ ni lati ṣafikun gbogbo rẹ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ninu ohun itanna, fi sinu ibi ipamọ ti gbogbo eniyan, ki o fun ni Ṣii Orisun.
Lakoko ti a nkọ gbogbo eyi, awọn imudojuiwọn ijabọ wa ṣakoso lati kuna ni aṣeyọri ati firanṣẹ ifiranṣẹ aṣiṣe kan si mi. Emi yoo ṣayẹwo lati rii boya o jẹ aṣiṣe…
Nkankan bu ni doge wa! Ṣe kii ṣe ohun ti a nireti? Gangan!
Ti wa ni o ti lọ si tú?
Ṣe o lero Mo padanu nkankan? O dabi pe o ṣe ileri lati gbe data lati SQL Server si Vertica, ati lẹhinna o mu ki o lọ kuro ni koko-ọrọ, ẹlẹgàn!
Irufin yii jẹ imomose, Mo kan ni lati pinnu diẹ ninu awọn ọrọ-ọrọ fun ọ. Bayi o le lọ siwaju.
Ètò wa nìyí:
Ṣe bẹ
Ṣe ina awọn iṣẹ-ṣiṣe
Wo bi ohun gbogbo ṣe lẹwa
Fi awọn nọmba igba lati kun
Gba data lati SQL Server
Fi data sinu Vertica
Gba awọn iṣiro
Nitorinaa, lati gba gbogbo eyi ati ṣiṣe, Mo ṣe afikun kekere si wa docker-compose.yml:
a kun awọn apoti isura infomesonu ni igbehin pẹlu diẹ ninu awọn data (ninu ọran kankan ko wo sinu mssql_init.py!)
A ṣe ifilọlẹ gbogbo awọn ti o dara pẹlu iranlọwọ ti aṣẹ idiju diẹ diẹ sii ju akoko to kẹhin lọ:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Kini randomizer iyanu wa ti ipilẹṣẹ, o le lo nkan naa Data Profiling/Ad Hoc Query:
Ohun akọkọ kii ṣe lati fi han si awọn atunnkanka
ṣe alaye lori Awọn akoko ETL Emi kii yoo ṣe, ohun gbogbo jẹ bintin nibẹ: a ṣe ipilẹ kan, ami kan wa ninu rẹ, a fi ipari si ohun gbogbo pẹlu oluṣakoso ọrọ, ati ni bayi a ṣe eyi:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
igba.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Àkókò náà ti dé gba data wa lati wa ọkan ati idaji ọgọrun tabili. Jẹ ki a ṣe eyi pẹlu iranlọwọ ti awọn laini ti ko ni itumọ pupọ:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Pẹlu iranlọwọ ti a kio ti a gba lati Airflow pymssql-sopọ
Jẹ ki a paarọ ihamọ kan ni irisi ọjọ kan sinu ibeere naa - yoo sọ sinu iṣẹ nipasẹ ẹrọ awoṣe.
Ifunni ibeere wa pandastani yio gba wa DataFrame - yoo wulo fun wa ni ojo iwaju.
Mo n lo aropo {dt} dipo ti a ìbéèrè paramita %s ko nitori Mo wa ohun buburu Pinocchio, ṣugbọn nitori pandas ko le mu pymssql ati ki o yo awọn ti o kẹhin params: Listbiotilejepe o gan fe tuple.
Tun akiyesi pe Olùgbéejáde pymssql pinnu lati ma ṣe atilẹyin fun u mọ, ati pe o to akoko lati lọ kuro pyodbc.
Jẹ ki a wo kini Airflow ṣe awọn ariyanjiyan ti awọn iṣẹ wa pẹlu:
Ti ko ba si data, lẹhinna ko si aaye lati tẹsiwaju. Ṣugbọn o tun jẹ ajeji lati ṣe akiyesi kikun ni aṣeyọri. Ṣugbọn eyi kii ṣe aṣiṣe. A-ah-ah, kini lati ṣe?! Ati pe kini eyi:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException yoo sọ fun Airflow pe ko si awọn aṣiṣe, ṣugbọn a fo iṣẹ naa. Ni wiwo yoo ko ni a alawọ ewe tabi pupa square, ṣugbọn Pink.
ID ti igba iṣan omi wa (yoo yatọ fun gbogbo iṣẹ-ṣiṣe),
A hash lati orisun ati ID aṣẹ - nitorinaa ni ibi ipamọ data ikẹhin (nibiti ohun gbogbo ti dà sinu tabili kan) a ni ID aṣẹ alailẹgbẹ kan.
Igbesẹ penultimate wa: tú ohun gbogbo sinu Vertica. Ati pe, ni iyalẹnu to, ọkan ninu awọn ọna iyalẹnu julọ ati lilo daradara lati ṣe eyi ni nipasẹ CSV!
- Daradara, - wi kekere Asin, - ni ko o, bayi
Ṣe o da ọ loju pe Emi ni ẹranko ti o ni ẹru julọ ninu igbo?
Julia Donaldson, The Gruffalo
Mo ro pe ti awọn ẹlẹgbẹ mi ati Emi ni idije kan: tani yoo yara ṣẹda ati ṣe ifilọlẹ ilana ETL kan lati ibere: wọn pẹlu SSIS wọn ati Asin ati mi pẹlu Airflow… Ati lẹhinna a yoo tun ṣe afiwe irọrun itọju… Iro ohun, Mo ro pe o yoo gba pe Emi yoo lu wọn lori gbogbo awọn iwaju!
Ti o ba jẹ diẹ sii ni pataki, lẹhinna Apache Airflow - nipa apejuwe awọn ilana ni irisi koodu eto - ṣe iṣẹ mi pọ diẹ itura ati igbaladun.
Agbara ailopin rẹ, mejeeji ni awọn ofin ti plug-ins ati predisposition si scalability, fun ọ ni aye lati lo Airflow ni fere eyikeyi agbegbe: paapaa ni kikun ọmọ ti gbigba, ngbaradi ati ṣiṣe data, paapaa ni ifilọlẹ awọn apata (si Mars, ti dajudaju).
Apá ipari, itọkasi ati alaye
Awọn àwárí ti a ti gba fun o
start_date. Bẹẹni, eyi ti jẹ meme agbegbe tẹlẹ. Nipasẹ Doug ká akọkọ ariyanjiyan start_date gbogbo kọja. Ni soki, ti o ba pato ninu start_date lọwọlọwọ ọjọ, ati schedule_interval - ni ọjọ kan, lẹhinna DAG yoo ṣe ifilọlẹ ko ṣaaju ọla.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Ati pe ko si awọn iṣoro diẹ sii.
Aṣiṣe asiko-ṣiṣe miiran wa ti o ni nkan ṣe pẹlu rẹ: Task is missing the start_date parameter, eyiti o tọka nigbagbogbo pe o gbagbe lati sopọ mọ oniṣẹ ẹrọ dag.
Ohun gbogbo lori ẹrọ kan. Bẹẹni, ati awọn ipilẹ (Airflow funrararẹ ati ibora wa), ati olupin wẹẹbu kan, ati oluṣeto, ati awọn oṣiṣẹ. Ati pe o ṣiṣẹ paapaa. Ṣugbọn ni akoko pupọ, nọmba awọn iṣẹ-ṣiṣe fun awọn iṣẹ dagba, ati nigbati PostgreSQL bẹrẹ lati dahun si atọka ni 20 s dipo 5 ms, a mu ati gbe lọ.
Aṣẹṣẹ agbegbe. Bẹẹni, a tun joko lori rẹ, ati pe a ti de eti ọgbun naa. LocalExecutor ti to fun wa titi di isisiyi, ṣugbọn nisisiyi o to akoko lati faagun pẹlu o kere ju oṣiṣẹ kan, ati pe a yoo ni lati ṣiṣẹ takuntakun lati gbe lọ si CeleryExecutor. Ati ni wiwo otitọ pe o le ṣiṣẹ pẹlu rẹ lori ẹrọ kan, ko si ohun ti o da ọ duro lati lo Seleri paapaa lori olupin kan, eyiti “dajudaju, kii yoo lọ sinu iṣelọpọ, nitootọ!”
Ti kii ṣe lilo -itumọ ti ni irinṣẹ:
awọn isopọ lati tọju awọn iwe-ẹri iṣẹ,
SLA padanu lati dahun si awọn iṣẹ ṣiṣe ti ko pari ni akoko,
xcom fun paṣipaarọ metadata (Mo sọ metadata!) laarin awọn iṣẹ-ṣiṣe dag.
mail abuse. O dara, kini MO le sọ? Awọn itaniji ti ṣeto fun gbogbo awọn atunwi ti awọn iṣẹ ṣiṣe ti o ṣubu. Bayi Gmail iṣẹ mi ni> awọn imeeli 90k lati Airflow, ati muzzle mail wẹẹbu kọ lati gbe ati paarẹ diẹ sii ju 100 ni akoko kan.
Ni ibere fun wa lati ṣiṣẹ paapaa pẹlu awọn ori wa kii ṣe pẹlu ọwọ wa, Airflow ti pese sile fun wa:
REST API — o tun ni ipo idanwo, eyiti ko ṣe idiwọ rẹ lati ṣiṣẹ. Pẹlu rẹ, o ko le gba alaye nikan nipa awọn dags ati awọn iṣẹ-ṣiṣe, ṣugbọn tun da / bẹrẹ dag kan, ṣẹda DAG Run tabi adagun kan.
CLI - ọpọlọpọ awọn irinṣẹ wa nipasẹ laini aṣẹ ti kii ṣe inira lati lo nipasẹ WebUI, ṣugbọn ko si ni gbogbogbo. Fun apere:
backfill nilo lati tun awọn iṣẹlẹ iṣẹ-ṣiṣe bẹrẹ.
Fun apẹẹrẹ, awọn atunnkanka wa o sọ pe: “Ati iwọ, ẹlẹgbẹ, ni ọrọ isọkusọ ninu data lati Oṣu Kini ọjọ 1 si 13! Ṣe atunṣe, ṣe atunṣe, ṣe atunṣe, tun ṣe!" Ati pe iwọ jẹ alarinrin bẹ:
run, eyiti o fun ọ laaye lati ṣiṣe iṣẹ-ṣiṣe apẹẹrẹ kan, ati paapaa Dimegilio lori gbogbo awọn igbẹkẹle. Pẹlupẹlu, o le ṣiṣe nipasẹ LocalExecutor, paapaa ti o ba ni iṣupọ Seleri.
Ṣe lẹwa Elo ohun kanna test, nikan tun ni awọn ipilẹ kọ ohunkohun.
connections faye gba ibi-ẹda awọn isopọ lati ikarahun.
API Python - ọna kuku ogbontarigi ti ibaraenisepo, eyiti o jẹ ipinnu fun awọn afikun, ati pe ko swaring ninu rẹ pẹlu awọn ọwọ kekere. Ṣugbọn tani yoo da wa duro lati lọ si /home/airflow/dags, sure ipython ki o si bẹrẹ idotin ni ayika? O le, fun apẹẹrẹ, okeere gbogbo awọn asopọ pẹlu koodu atẹle:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Nsopọ si metadatabase Airflow. Emi ko ṣeduro kikọ si rẹ, ṣugbọn gbigba awọn ipinlẹ iṣẹ-ṣiṣe fun ọpọlọpọ awọn metiriki pato le jẹ yiyara pupọ ati rọrun ju nipasẹ eyikeyi awọn API.
Jẹ ki a sọ pe kii ṣe gbogbo awọn iṣẹ-ṣiṣe wa ni agbara, ṣugbọn wọn le ṣubu nigbakan, ati pe eyi jẹ deede. Ṣugbọn awọn idena diẹ ti wa tẹlẹ ifura, ati pe yoo jẹ pataki lati ṣayẹwo.
Ṣọra SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
jo
Ati pe dajudaju, awọn ọna asopọ mẹwa akọkọ lati ipinfunni Google jẹ awọn akoonu ti folda Airflow lati awọn bukumaaki mi.
Zen ti Python ati Apache Airflow - Ifilọlẹ DAG titọ, jiju ọrọ ọrọ sinu awọn iṣẹ, lẹẹkansi nipa awọn igbẹkẹle, ati paapaa nipa awọn ifilọlẹ iṣẹ-ṣiṣe fo.