Barka dai, Ni Dmitry Logvinenko - Injiniyan Bayanai na Sashen Bincike na rukunin kamfanoni na Vezet.
Zan gaya muku game da kayan aiki mai ban mamaki don haɓaka hanyoyin ETL - Apache Airflow. Amma Airflow ne mai amfani da yawa da kuma sanyaya wa ya kamata ku kula da shi ko da ba ku da hannu a cikin bayanai kuma ba da buƙatar lokaci-lokaci kisan.
Kuma a, ba zan fada kawai ba, amma kuma nuna: shirin yana da lambar da yawa, hotunan kariyar kwamfuta da shawarwari.
Abin da kuke gani yawanci lokacin da kuke google kalmar Airflow / Wikimedia Commons
- kawai mafi kyau, kuma an yi shi don dalilai daban-daban, wato (kamar yadda aka rubuta a gaban kata):
gudana da saka idanu ayyuka akan injuna marasa iyaka (kamar yadda Celery / Kubernetes da yawa da lamirinku zasu ba ku damar)
tare da tsararrun aikin aiki mai ƙarfi daga mai sauƙin rubutu da fahimtar lambar Python
da ikon haɗa kowane bayanan bayanai da APIs tare da juna ta amfani da abubuwan da aka yi da shirye-shiryen da kayan aikin gida (wanda yake da sauƙin gaske).
Muna amfani da Apache Airflow kamar haka:
muna tattara bayanai daga tushe daban-daban (yawancin SQL Server da lokuta na PostgreSQL, APIs daban-daban tare da ma'aunin aikace-aikacen, har ma da 1C) a cikin DWH da ODS (muna da Vertica da Clickhouse).
yadda ci gaba cron, wanda ke fara tsarin haɗin gwiwar bayanai akan ODS, kuma yana sa ido kan kiyaye su.
Har zuwa kwanan nan, buƙatunmu an rufe su da ƙaramin sabar sabar guda ɗaya mai 32 cores da 50 GB na RAM. A cikin Airflow, wannan yana aiki:
mafi 200 daga (a zahiri gudanawar aiki, wanda muka cika ayyuka),
a kowace a matsakaici Ayyuka 70,
wannan alherin yana farawa (kuma a matsakaici) sau daya a awa daya.
Kuma game da yadda muka fadada, zan rubuta a kasa, amma yanzu bari mu ayyana über-matsalar da za mu warware:
Akwai SQL Servers na asali guda uku, kowannensu yana da bayanan bayanai guda 50 - misalin aikin guda daya, bi da bi, suna da tsari iri daya (kusan ko'ina, mua-ha-ha), wanda ke nufin cewa kowanne yana da tebur na oda (an yi sa'a, tebur mai wannan. suna iya turawa cikin kowane kasuwanci). Muna ɗaukar bayanan ta ƙara filayen sabis (sabar tushen, tushen bayanai, ID na aiki na ETL) kuma mu jefa su cikin butulci, a ce, Vertica.
Bari mu tafi!
Babban sashi, mai amfani (kuma ɗan ka'ida)
Me yasa mu (da ku)
Lokacin da bishiyoyi suke da girma kuma na kasance mai sauƙi SQL-schik a cikin dillalan Rasha ɗaya, mun zamba da tsarin ETL aka gudanar da bayanai ta amfani da kayan aikin guda biyu da ke akwai:
Cibiyar Wuta ta Informatica - tsarin da ke yaɗuwa sosai, yana da fa'ida sosai, tare da kayan masarufi, nau'in nasa. Na yi amfani da Allah ya kiyaye kashi 1% na iyawarsa. Me yasa? Da kyau, da farko, wannan haɗin gwiwa, wani wuri daga 380s, a hankali yana matsa mana lamba. Na biyu, an ƙirƙiri wannan ƙetare don kyakkyawan tsari, sake amfani da abubuwan ban haushi da sauran dabarun-kasuwanci masu mahimmanci. Game da gaskiyar cewa farashinsa, kamar reshe na Airbus AXNUMX / shekara, ba za mu ce komai ba.
Hattara, hoton allo na iya cutar da mutanen ƙasa da 30 kaɗan
SQL Sabar Haɗin Kai - mun yi amfani da wannan abokin aikinmu a cikin ayyukanmu na gudana. To, a gaskiya: mun riga mun yi amfani da SQL Server, kuma zai zama ko ta yaya rashin hankali kada a yi amfani da kayan aikin ETL. Duk abin da ke cikin shi yana da kyau: duka masu dubawa suna da kyau, kuma rahoton ci gaba ... Amma wannan ba shine dalilin da ya sa muke son samfuran software ba, oh, ba don wannan ba. Siffar shi dtsx (wanda shine XML tare da nodes shuffled akan ajiyewa) zamu iya, amma menene ma'anar? Yaya game da yin kunshin ɗawainiya wanda zai ja ɗaruruwan teburi daga wannan uwar garken zuwa wani? Ee, menene ɗari, yatsan hannun ku zai faɗi daga guda ashirin, danna maɓallin linzamin kwamfuta. Amma tabbas ya fi kyan gani:
Lallai mun nemi mafita. Harka ma kusan ya zo kan janareta fakitin SSIS wanda ya rubuta kansa ...
...sai kuma wani sabon aiki ya same ni. Kuma Apache Airflow ya riske ni a kai.
Lokacin da na gano cewa bayanin tsarin ETL code ne mai sauƙi na Python, Ban yi rawa don murna ba. Wannan shi ne yadda aka tsara magudanar bayanai da rarrabasu, kuma zuba teburi masu tsari guda ɗaya daga ɗaruruwan rumbun adana bayanai a cikin manufa ɗaya ya zama batu na lambar Python a fuska ɗaya da rabi ko biyu 13 ”.
Haɗa tari
Kada mu shirya gaba ɗaya makarantar kindergarten, kuma kada muyi magana game da cikakkun abubuwa a bayyane a nan, kamar shigar da Airflow, bayanan da kuka zaɓa, Seleri da sauran lokuta da aka bayyana a cikin docks.
Domin mu fara gwaji nan da nan, na zayyana docker-compose.yml a cikinsa:
Bari a zahiri tadawa Gunadan iska: Mai tsara tsarawa, Webserver. Flower kuma za ta yi juyi a wurin don saka idanu kan ayyukan Celery (saboda an riga an tura shi cikin apache/airflow:1.10.10-python3.7amma bamu damu ba)
PostgreSQL, wanda Airflow zai rubuta bayanan sabis ɗin sa (bayanan tsarawa, kididdigar kisa, da dai sauransu), kuma Celery zai nuna alamun kammala ayyukan;
Redis, wanda zai yi aiki a matsayin dillali mai aiki don Celery;
Ma'aikacin Seleri, wanda za a tsunduma cikin aiwatar da ayyuka kai tsaye.
Zuwa babban fayil ./dags za mu ƙara fayilolin mu tare da bayanin dags. Za a ɗauke su a kan gardama, don haka babu buƙatar jujjuya dukkan tari bayan kowace atishawa.
A wasu wurare, ba a nuna lambar a cikin misalan gaba ɗaya (don kar a rikitar da rubutun), amma a wani wuri an canza shi a cikin tsari. Ana iya samun cikakkun misalan lambar aiki a cikin ma'ajiya https://github.com/dm-logv/airflow-tutorial.
A cikin taro na abun da ke ciki, na dogara da yawa ga sanannen hoton puckel/docker-gudanar iska - tabbatar da duba shi. Wataƙila ba kwa buƙatar wani abu dabam a rayuwar ku.
Ana samun duk saitunan iska ba kawai ta hanyar ba airflow.cfg, amma kuma ta hanyar canjin yanayi (godiya ga masu haɓakawa), waɗanda na yi amfani da su da mugunta.
A zahiri, ba a shirye-shiryen samarwa ba: Ban sanya bugun zuciya a kan kwantena da gangan ba, ban damu da tsaro ba. Amma na yi mafi ƙarancin dacewa ga masu gwajin mu.
Lura cewa:
Dole ne babban fayil ɗin dag ya kasance mai isa ga duka mai tsarawa da ma'aikata.
Hakanan ya shafi duk ɗakunan karatu na ɓangare na uku - dole ne a sanya su duka akan injina tare da na'urar tsarawa da ma'aikata.
To, yanzu yana da sauki:
$ docker-compose up --scale worker=3
Bayan komai ya tashi, zaku iya duba mu'amalar yanar gizo:
Idan ba ku fahimci komai ba a cikin duk waɗannan “dags”, to ga ɗan gajeren ƙamus:
tsara - mafi mahimmancin kawu a cikin Airflow, wanda ke sarrafa cewa robots suna aiki tuƙuru, ba mutum ba: yana sa ido kan jadawalin, sabunta dags, ƙaddamar da ayyuka.
Gabaɗaya, a cikin tsoffin juzu'ai, yana da matsaloli tare da ƙwaƙwalwar ajiya (a'a, ba amnesia ba, amma leaks) kuma ma'aunin gado har ma ya kasance a cikin saitunan. run_duration - tazarar sake farawa. Amma yanzu komai ya daidaita.
Dag (aka "dag") - "directed acyclic graph", amma irin wannan ma'anar zai gaya wa 'yan mutane, amma a gaskiya shi ne akwati don ayyuka masu hulɗa da juna (duba ƙasa) ko analogue na Kunshin a cikin SSIS da Workflow a Informatica. .
Baya ga dags, ana iya har yanzu akwai subdags, amma da alama ba za mu iya zuwa gare su ba.
DAG Run - farawa dag, wanda aka sanya nasa execution_date. Dagrans na wannan dag na iya aiki a layi daya (idan kun sanya ayyukanku masu ƙarfi, ba shakka).
Operator guda ne na lambar da ke da alhakin yin takamaiman aiki. Akwai nau'ikan masu aiki guda uku:
matakikamar wanda muke so PythonOperator, wanda zai iya aiwatar da kowane lambar Python (mai inganci);
canja wurin, masu jigilar bayanai daga wuri zuwa wuri, suna cewa, MsSqlToHiveTransfer;
Na'urar haska bayanai a gefe guda kuma, zai ba ku damar mayar da martani ko rage ci gaba da aiwatar da dag har sai wani lamari ya faru. HttpSensor zai iya ja da ƙayyadadden wurin ƙarshe, kuma lokacin da ake jiran amsan da ake so, fara canja wuri GoogleCloudStorageToS3Operator. Mai tunani zai tambaya: “Me ya sa? Bayan haka, zaku iya yin maimaitawa daidai a cikin ma'aikacin!" Sannan, don kar a toshe tafkin ayyuka tare da masu aiki da aka dakatar. Firikwensin yana farawa, dubawa kuma ya mutu kafin ƙoƙari na gaba.
Task - ma'aikatan da aka ayyana, ba tare da la'akari da nau'in ba, kuma a haɗe zuwa dag ana haɓaka su zuwa matsayi na aiki.
misali aiki - lokacin da babban mai tsarawa ya yanke shawarar cewa lokaci ya yi da za a aika ayyuka cikin yaƙi a kan ma'aikata-ma'aikata (dama kan tabo, idan muka yi amfani da su). LocalExecutor ko zuwa kumburi mai nisa a cikin lamarin CeleryExecutor), yana ba su mahallin mahallin (watau saitin masu canji - sigogin aiwatarwa), yana faɗaɗa umarni ko samfuri na tambaya, da taruwa su.
Muna samar da ayyuka
Da farko, bari mu zayyana tsarin gaba ɗaya na kuren namu, sannan za mu ƙara nutsewa cikin cikakkun bayanai, saboda muna amfani da wasu hanyoyin da ba su da mahimmanci.
Don haka, a cikin mafi sauƙin tsari, irin wannan dag zai yi kama da haka:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Bari mu gane shi:
Na farko, muna shigo da labulen da ake bukata da wani abu kuma;
sql_server_ds Shin List[namedtuple[str, str]] tare da sunayen hanyoyin sadarwa daga Airflow Connections da kuma bayanan da za mu dauki farantin mu;
dag - sanarwar kuren mu, wanda dole ne ya kasance a ciki globals(), in ba haka ba Airflow ba zai same shi ba. Doug kuma yana buƙatar ya ce:
cewa zai yi aiki daga tsakar dare a ranar takwas ga Yuli,
kuma ya kamata ya gudana, kusan kowane sa'o'i 6 (ga masu tauri a nan maimakon timedelta() m cron- layi 0 0 0/6 ? * * *, ga mafi ƙarancin sanyi - magana kamar @daily);
workflow() zai yi babban aikin, amma ba yanzu ba. A yanzu, za mu jefar da mahallin mu a cikin log ɗin.
Kuma yanzu sauƙin sihiri na ƙirƙirar ayyuka:
muna gudu ta hanyar mu;
fara farawa PythonOperator, wanda zai kashe mu dummy workflow(). Kar a manta don saka sunan na musamman (a cikin dag) na aikin kuma ku ɗaure dag ɗin kanta. Tuta provide_context bi da bi, za su zuba ƙarin muhawara a cikin aikin, wanda za mu tattara a hankali ta amfani da **context.
A yanzu, shi ke nan. Abin da muka samu:
sabon dag a cikin yanar gizo dubawa,
ayyuka dari da rabi da za a aiwatar a layi daya (idan Airflow, Selery settings da uwar garken damar).
To, kusan samu.
Wanene zai shigar da masu dogara?
Don sauƙaƙa wannan duka, na kutsa kai docker-compose.yml sarrafawa requirements.txt a kan dukkan nodes.
Yanzu ya tafi:
Fure-fure masu launin toka misalai ne na ɗawainiya da mai tsarawa ke sarrafa su.
Muna jira kadan, ma'aikata ne suka tattara ayyukan:
Masu kore, ba shakka, sun kammala aikinsu cikin nasara. Reds ba su da nasara sosai.
Af, babu babban fayil akan samfurin mu ./dags, babu aiki tare tsakanin inji - duk dags suna kwance a ciki git akan Gitlab ɗin mu, kuma Gitlab CI yana rarraba sabuntawa zuwa injuna lokacin haɗuwa a ciki master.
Kadan game da Flower
Yayin da ma'aikata ke murƙushe kayan aikin mu, bari mu tuna wani kayan aiki wanda zai iya nuna mana wani abu - Flower.
Shafi na farko tare da taƙaitaccen bayani akan nodes na ma'aikata:
Shafi mafi tsanani tare da ayyuka waɗanda suka tafi aiki:
Shafi mafi ban sha'awa tare da matsayin dillalin mu:
Shafi mafi haske yana tare da jadawali matsayi da lokacin aiwatar da su:
Muna ɗora abubuwan da ke ƙasa
Don haka, duk ayyukan sun yi aiki, zaku iya ɗaukar waɗanda suka ji rauni.
Kuma akwai mutane da yawa da suka ji rauni - saboda wani dalili ko wani. Game da yadda ake amfani da Airflow daidai, waɗannan murabba'ai suna nuna cewa babu shakka bayanan ba su isa ba.
Kuna buƙatar kallon log ɗin kuma sake kunna misalan ayyuka da suka faɗi.
Ta danna kowane murabba'i, za mu ga ayyukan da ke akwai a gare mu:
Kuna iya ɗauka kuma ku share faɗuwar. Wato, mun manta cewa wani abu ya gaza a can, kuma aikin misali guda zai tafi ga mai tsarawa.
A bayyane yake cewa yin wannan tare da linzamin kwamfuta tare da duk jajayen murabba'i ba shi da mutuntawa sosai - wannan ba shine abin da muke tsammani daga Airflow ba. A zahiri, muna da makamai na hallaka jama'a: Browse/Task Instances
Bari mu zaɓi komai a lokaci guda kuma mu sake saita sifili, danna abu daidai:
Bayan tsaftacewa, taksinmu yayi kama da haka (sun riga sun jira mai tsara jadawalin su):
Haɗi, ƙugiya da sauran masu canji
Lokaci yayi da za a kalli DAG na gaba, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Shin kowa ya taɓa yin sabuntawar rahoto? Wannan ita ce kuma: akwai jerin hanyoyin da za a samo bayanan; akwai jerin inda za a saka; kar a manta da yin magana lokacin da komai ya faru ko ya karye (da kyau, wannan ba game da mu ba ne, a'a).
Bari mu sake shiga cikin fayil ɗin mu duba sabbin abubuwan da ba su da kyau:
from commons.operators import TelegramBotSendMessage - Babu wani abu da zai hana mu yin namu ma'aikata, wanda muka yi amfani da shi ta hanyar yin ƙaramin takarda don aika saƙonni zuwa Unblocked. (Za mu yi magana game da wannan ma'aikacin da ke ƙasa);
default_args={} - dag na iya rarraba mahawara iri ɗaya ga duk ma'aikatanta;
to='{{ var.value.all_the_kings_men }}' - filin to Ba za mu yi hardcoded ba, amma an samar da shi ta hanyar amfani da Jinja da ma'auni tare da jerin imel, wanda na sanya a hankali. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - sharadi don farawa mai aiki. A cikin yanayinmu, wasiƙar za ta tashi zuwa ga shugabannin ne kawai idan duk abin dogara ya yi aiki nasara;
tg_bot_conn_id='tg_main' - muhawara conn_id karbi ID na haɗin haɗin da muka ƙirƙira a ciki Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - saƙonni a cikin Telegram za su tashi kawai idan akwai ayyuka da suka fadi;
task_concurrency=1 - mun haramta ƙaddamar da lokuta da dama na ɗawainiya a lokaci guda. In ba haka ba, za mu sami ƙaddamar da yawa lokaci guda VerticaOperator (kallon tebur daya);
report_update >> [email, tg] - duk VerticaOperator hada kai wajen aika wasiku da sakonni, kamar haka:
Amma tunda masu aikin sanarwar suna da yanayin ƙaddamarwa daban-daban, ɗaya kawai zai yi aiki. A cikin Duban Bishiyar, komai yayi kama da ƙarancin gani:
Zan faɗi 'yan kalmomi game da su macros da abokansu - masu canji.
Macros sune masu riƙe da Jinja waɗanda zasu iya musanya bayanai masu amfani daban-daban zuwa gardamar mai aiki. Misali, kamar haka:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} zai faɗaɗa zuwa abubuwan da ke cikin mahallin mahallin execution_date a tsari YYYY-MM-DD: 2020-07-14. Mafi kyawun sashi shine cewa masu canjin mahallin an ƙusa su zuwa takamaiman misali na ɗawainiya (fare a cikin Duban Bishiyar), kuma idan aka sake kunnawa, masu sanya wuri za su faɗaɗa zuwa ƙima iri ɗaya.
Ana iya duba ƙimar da aka keɓance ta amfani da maɓallin da aka yi a kowane misalin ɗawainiya. Wannan shine yadda aikin aika wasiƙa:
Don haka a cikin aikin tare da aika saƙo:
Cikakken jeri na ginanniyar macros don sabuwar sigar da ake da ita tana nan: macro reference
Bugu da ƙari, tare da taimakon plugins, za mu iya bayyana macro na mu, amma wannan wani labari ne.
Baya ga abubuwan da aka riga aka ƙayyade, za mu iya musanya dabi'u na masu canjin mu (Na riga na yi amfani da wannan a cikin lambar da ke sama). Bari mu ƙirƙira a ciki Admin/Variables abubuwa biyu:
kawai amfani da hanyar zuwa maɓallin da ake so: {{ var.json.bot_config.bot.token }}.
Zan faɗi kalma ɗaya a zahiri kuma in nuna hoton allo ɗaya game da shi haɗi. Komai na farko anan: akan shafi Admin/Connections muna ƙirƙira hanyar haɗi, ƙara abubuwan shiga / kalmomin shiga da ƙarin takamaiman sigogi a wurin. Kamar wannan:
Ana iya rufaffen kalmomin shiga (mafi kyau fiye da tsoho), ko kuna iya barin nau'in haɗin (kamar yadda na yi don tg_main) - gaskiyar ita ce jerin nau'ikan suna da ƙarfi a cikin samfuran Airflow kuma ba za a iya faɗaɗa ba tare da shiga cikin lambobin tushe (idan ba zato ba tsammani ban google wani abu ba, don Allah a gyara ni), amma babu abin da zai hana mu samun ƙima ta hanyar kawai. suna.
Hakanan zaka iya yin haɗi da yawa tare da suna iri ɗaya: a wannan yanayin, hanyar BaseHook.get_connection(), wanda ke samun mu haɗin kai da sunan, zai bayar bazuwar daga sunayen da yawa (zai zama mafi ma'ana don yin Round Robin, amma bari mu bar shi a kan lamiri na masu haɓakawa na Airflow).
Bambance-bambancen da Haɗin kai tabbas kayan aiki ne masu kyau, amma yana da mahimmanci kada ku rasa ma'auni: waɗanne ɓangarorin magudanar ruwa kuke adanawa a cikin lambar kanta, da kuma waɗanne sassa kuke ba Airflow don ajiya. A gefe ɗaya, yana iya zama dacewa don canza ƙimar da sauri, misali, akwatin aikawa, ta UI. A daya hannun, wannan shi ne har yanzu komawa zuwa linzamin kwamfuta click, daga abin da muka (I) so mu rabu da.
Yin aiki tare da haɗin gwiwa yana ɗaya daga cikin ayyuka ƙugiya. Gabaɗaya, ƙugiya na Airflow maki ne don haɗa shi zuwa sabis na ɓangare na uku da ɗakunan karatu. Misali, JiraHook zai buɗe mana abokin ciniki don yin hulɗa tare da Jira (zaku iya motsa ayyuka baya da gaba), kuma tare da taimakon SambaHook za ka iya tura fayil na gida zuwa smb- batu.
Yin nazarin ma'aikacin al'ada
Kuma mun kusa duba yadda ake yinsa TelegramBotSendMessage
Lambar commons/operators.py tare da ainihin ma'aikacin:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Anan, kamar kowane abu a cikin Airflow, komai mai sauqi ne:
Gado daga BaseOperator, wanda ke aiwatar da wasu ƙayyadaddun abubuwan ƙayyadaddun iska (duba lokacin hutun ku)
Filayen da aka ayyana template_fields, wanda Jinja zai nemi macro don sarrafa shi.
Shirya dalilai masu dacewa don __init__(), saita abubuwan da suka dace a inda ya cancanta.
Ba mu manta da farkon kakannin ma ba.
Buɗe ƙugiya mai dacewa TelegramBotHooksamu abokin ciniki abu daga gare ta.
Hanyar da aka soke (tabbatar). BaseOperator.execute(), wanda Airfow zai yi rawar jiki lokacin da lokaci ya yi don ƙaddamar da mai aiki - a ciki za mu aiwatar da babban aikin, manta da shiga. (Mun shiga, ta hanya, daidai stdout и stderr - Gudun iska za ta katse komai, kunsa shi da kyau, lalata shi idan ya cancanta.)
Bari mu ga abin da muke da shi commons/hooks.py. Sashin farko na fayil ɗin, tare da ƙugiya kanta:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ban ma san abin da zan yi bayani a nan ba, zan lura da muhimman batutuwa:
Mun gaji, tunani game da muhawara - a mafi yawan lokuta zai zama daya: conn_id;
Haɓaka daidaitattun hanyoyin: Na iyakance kaina get_conn(), wanda a ciki na sami sigogin haɗin da suna kuma kawai samun sashin extra (wannan filin JSON ne), wanda na (bisa ga umarnina!) sanya alamar bot na Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Na kirkiro misalin mu TelegramBot, ba shi takamaiman alama.
Shi ke nan. Kuna iya samun abokin ciniki daga ƙugiya ta amfani da TelegramBotHook().clent ko TelegramBotHook().get_conn().
Kuma kashi na biyu na fayil ɗin, wanda nake yin microwrapper don Telegram REST API, don kada in ja iri ɗaya. python-telegram-bot ga hanya daya sendMessage.
Hanyar da ta dace ita ce ƙara duka: TelegramBotSendMessage, TelegramBotHook, TelegramBot - a cikin plugin ɗin, saka a cikin ma'ajiyar jama'a, kuma a ba shi ga Buɗe tushen.
Yayin da muke nazarin duk wannan, sabunta rahotonmu ya yi nasara cikin nasara kuma ya aiko mani da sakon kuskure a cikin tashar. Zan duba don ganin ko ba daidai ba...
Wani abu ya fashe a cikin dogenmu! Ashe ba abin da muke tsammani ba ne? Daidai!
Za ki zuba?
Kuna jin na rasa wani abu? Da alama ya yi alƙawarin canja wurin bayanai daga SQL Server zuwa Vertica, sannan ya ɗauka ya kawar da batun, ɗan iska!
Wannan ta'asa ta ganganci ne, kawai sai na zana muku wasu kalmomi. Yanzu za ku iya ci gaba.
Shirin mu shine:
Ku daga
Ƙirƙirar ayyuka
Dubi yadda komai yake da kyau
Sanya lambobin zama don cikewa
Samo bayanai daga SQL Server
Saka bayanai cikin Vertica
Tattara ƙididdiga
Don haka, don samun wannan duka, na yi ƙaramin ƙari ga namu docker-compose.yml:
Vertica a matsayin mai masaukin baki dwh tare da mafi yawan saitunan tsoho,
lokuta uku na SQL Server,
mun cika ma'ajin bayanai na karshen tare da wasu bayanai (a kowane hali kar a duba mssql_init.py!)
Mun ƙaddamar da duk kyawawan abubuwa tare da taimakon umarni mai rikitarwa fiye da lokacin ƙarshe:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Abin da randomizer mu'ujiza ya haifar, zaku iya amfani da abun Data Profiling/Ad Hoc Query:
Babban abu shine kada a nuna shi ga manazarta
yi bayani dalla-dalla Zaman ETL Ba zan iya ba, duk abin da ba shi da mahimmanci a can: muna yin tushe, akwai alama a ciki, mun kunsa komai tare da mai sarrafa mahallin, kuma yanzu muna yin haka:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
zaman.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Lokaci ya yi tattara bayanan mu daga teburin mu ɗari da rabi. Bari mu yi wannan tare da taimakon layukan da ba su da fa'ida sosai:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Tare da taimakon ƙugiya muna samun daga Airflow pymssql-haɗa
Bari mu musanya ƙuntatawa ta hanyar kwanan wata a cikin buƙatun - za a jefa shi cikin aikin ta injin samfuri.
Ciyar da bukatar mu pandaswa zai same mu DataFrame - zai zama da amfani a gare mu a nan gaba.
Ina amfani da canji {dt} maimakon ma'aunin buƙata %s ba don ni mugun Pinocchio ba ne, amma saboda pandas kasa rikewa pymssql kuma ya zame na karshe params: Listko da yake yana so tuple.
Hakanan lura cewa mai haɓakawa pymssql yanke shawarar ba za ta ƙara tallafa masa ba, kuma lokaci ya yi da za a ƙaura pyodbc.
Bari mu ga abin da Airflow ya cika muhawarar ayyukanmu da:
Idan babu bayanai, to babu ma'ana a ci gaba. Amma kuma yana da ban mamaki a yi la'akari da cikar nasara. Amma wannan ba kuskure ba ne. A-ah-ah, me za a yi?! Ga abin da:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException ya gaya wa Airflow cewa babu kurakurai, amma mun tsallake aikin. Matsakaicin ba zai sami murabba'in kore ko ja ba, amma ruwan hoda.
A kan siyarwa, muna ƙirƙirar farantin da aka yi niyya da hannu. Anan na kyale kaina karamar inji:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
Ina amfani VerticaOperator() Na ƙirƙiri tsarin tsarin bayanai da tebur (idan ba su wanzu ba, ba shakka). Babban abu shine a tsara abubuwan dogaro daidai:
- To, - in ji ƙaramin linzamin kwamfuta, - ba haka ba, yanzu
Shin kun gamsu cewa ni ne mafi munin dabba a cikin dajin?
Julia Donaldson, The Gruffalo
Ina tsammanin idan ni da abokan aiki na muna da gasa: wanda zai yi sauri ya ƙirƙira da ƙaddamar da tsarin ETL daga karce: su da SSIS da linzamin kwamfuta da ni tare da Airflow ... Sannan kuma za mu kwatanta sauƙin kulawa ... Kai, ina tsammanin za ku yarda cewa zan doke su ta kowane bangare!
Idan kadan da gaske, to Apache Airflow - ta hanyar kwatanta matakai a cikin nau'in lambar shirin - ya yi aiki na. da yawa mafi dadi da jin dadi.
Its Unlimited extensibility, duka cikin sharuddan toshe-ins da predisposition zuwa scalability, ba ka damar amfani da Airflow a kusan kowane yanki: ko da a cikin cikakken sake zagayowar na tattara, shirya da kuma sarrafa bayanai, ko da a harba roka (zuwa Mars, na hanya).
Sashe na ƙarshe, tunani da bayanai
Rake da muka tara muku
start_date. Ee, wannan rigar meme ce ta gida. Via Doug babbar hujja start_date duk wucewa. A taƙaice, idan kun saka start_date kwanan wata, da schedule_interval - wata rana, to DAG zai fara gobe ba da wuri ba.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Kuma babu sauran matsaloli.
Akwai wani kuskuren runtime mai alaƙa da shi: Task is missing the start_date parameter, wanda galibi yana nuna cewa kun manta daure da ma'aikacin dag.
Duk akan inji daya. Ee, da tushe (Airflow kanta da murfin mu), da sabar yanar gizo, da mai tsarawa, da ma'aikata. Kuma har ya yi aiki. Amma bayan lokaci, yawan ayyuka don ayyuka ya karu, kuma lokacin da PostgreSQL ya fara amsa ma'anar a cikin 20 s maimakon 5 ms, mun dauke shi kuma muka dauke shi.
Mai zartarwa na gida. Eh, muna zaune a kanta, kuma mun riga mun zo bakin ramin. LocalExecutor ya ishe mu ya zuwa yanzu, amma yanzu lokaci ya yi da za mu faɗaɗa tare da aƙalla ma'aikaci ɗaya, kuma za mu yi aiki tuƙuru don ƙaura zuwa CeleryExecutor. Kuma saboda gaskiyar cewa za ku iya yin aiki tare da shi a kan na'ura ɗaya, babu abin da zai hana ku yin amfani da Celery ko da a kan uwar garke, wanda "ba shakka, ba zai taba shiga cikin samarwa ba, gaskiya!"
Rashin amfani ginannen kayan aikin:
Connections don adana bayanan sabis,
SLA ya rasa don amsa ayyukan da ba su yi aiki a kan lokaci ba,
xcom don musayar metadata (Na ce metadata!) tsakanin dag ayyuka.
Zagin wasiku. To, me zan iya cewa? An saita faɗakarwa don duk maimaita ayyukan da suka faɗi. Yanzu aikina na Gmail yana da> 90k imel daga Airflow, kuma muzzle na gidan yanar gizo ya ƙi ɗauka da share fiye da 100 a lokaci guda.
Domin mu kara yin aiki da kawunanmu ba da hannunmu ba, Airflow ya shirya mana wannan:
REST API - har yanzu yana da matsayi na gwaji, wanda ba ya hana shi aiki. Tare da shi, ba za ku iya samun bayanai kawai game da dags da ayyuka ba, amma kuma dakatarwa / fara dag, ƙirƙirar DAG Run ko tafkin.
CLI - Ana samun kayan aikin da yawa ta hanyar layin umarni waɗanda ba kawai rashin dacewa don amfani da su ta hanyar WebUI ba, amma gabaɗaya ba su nan. Misali:
backfill da ake buƙata don sake farawa al'amuran ɗawainiya.
Misali, manazarta sun zo suka ce: “Kuma kai abokina, kana da shirme a cikin bayanan daga 1 zuwa 13 ga Janairu! Gyara shi, gyara shi, gyara shi, gyara shi!" Kuma ku ne irin wannan hob:
Sabis na tushe: initdb, resetdb, upgradedb, checkdb.
run, wanda ke ba ku damar gudanar da aikin misali ɗaya, har ma da maki akan duk abin dogaro. Bugu da ƙari, za ku iya gudanar da shi ta hanyar LocalExecutor, koda kuwa kuna da tarin Seleri.
Yayi kyawawan abu iri ɗaya test, kawai kuma a cikin tushe ba ya rubuta komai.
connections yana ba da damar ƙirƙirar haɗin gwiwar taro daga harsashi.
API na Python - hanyar mu'amala mai wuyar gaske, wacce aka yi niyya don plugins, kuma ba ta mamaye ta da ƙananan hannaye ba. Amma wa zai hana mu zuwa /home/airflow/dags, gudu ipython kuma fara rikici? Kuna iya, alal misali, fitar da duk haɗin gwiwa tare da lambar mai zuwa:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Haɗa zuwa metadatabase na Airflow. Ba na ba da shawarar rubuta masa ba, amma samun jihohin ɗawainiya don takamaiman ma'auni na musamman na iya zama da sauri da sauƙi fiye da kowane API ɗin.
Bari mu ce ba duk ayyukanmu ba ne masu ƙarfi, amma wani lokacin suna iya faɗi, kuma wannan al'ada ce. Amma 'yan blockages sun riga sun kasance masu shakku, kuma zai zama dole a bincika.
Hattara SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
nassoshi
Kuma tabbas, hanyoyin haɗin yanar gizo goma na farko daga fitowar Google sune abubuwan da ke cikin babban fayil ɗin Airflow daga alamomi na.
Zen na Python da Apache Airflow - Gabatar da DAG a fakaice, mahallin jifa cikin ayyuka, sake game da dogaro, da kuma game da tsallake ayyukan ƙaddamarwa.