Hai, aku Dmitry Logvinenko - Insinyur Data Departemen Analytics saka grup perusahaan Vezet.
Aku bakal ngandhani sampeyan babagan alat sing apik kanggo ngembangake proses ETL - Apache Airflow. Nanging Airflow iku serbaguna lan multifaceted sing kudu njupuk dipikir nyedhaki malah yen sampeyan ora melu ing aliran data, nanging kudu periodik miwiti proses lan ngawasi eksekusi.
Lan ya, aku ora mung ngandhani, nanging uga nuduhake: program kasebut duwe akeh kode, gambar lan rekomendasi.
Apa sing biasane sampeyan deleng nalika sampeyan google tembung Airflow / Wikimedia Commons
- mung luwih apik, lan digawe kanggo tujuan sing beda, yaiku (kaya sing ditulis sadurunge kata):
mlaku lan ngawasi tugas ing mesin tanpa watesan (kaya akeh Celery / Kubernetes lan nurani sampeyan bakal ngidini sampeyan)
karo generasi alur kerja dinamis saka gampang banget kanggo nulis lan ngerti kode Python
lan kemampuan kanggo nyambungake database lan API karo saben liyane nggunakake loro komponen siap-digawe lan plugin ngarep-digawe (sing arang banget prasaja).
Kita nggunakake Apache Airflow kaya iki:
kita ngumpulake data saka macem-macem sumber (akeh SQL Server lan PostgreSQL kedadean, macem-macem API karo metrik aplikasi, malah 1C) ing DWH lan ODS (kita duwe Vertica lan Clickhouse).
carane maju cron, sing miwiti pangolahan konsolidasi data ing ODS, lan uga ngawasi pangopènan.
Nganti saiki, kabutuhan kita ditutupi dening siji server cilik kanthi 32 intine lan 50 GB RAM. Ing Airflow, iki bisa digunakake:
liyane 200 ewu (sajatine alur kerja, sing kita isi tugas),
ing saben rata-rata 70 tugas,
kabecikan iki diwiwiti (uga rata-rata) sapisan jam.
Lan babagan carane kita nggedhekake, aku bakal nulis ing ngisor iki, nanging saiki ayo nemtokake über-masalah sing bakal kita ngrampungake:
Ana telung SQL Servers dhisikan, saben karo 50 database - conto saka siji project, mungguh, padha duwe struktur padha (meh nang endi wae, mua-ha-ha), kang tegese saben duwe Tabel Pesenan (untung, tabel karo sing. jeneng bisa push menyang bisnis apa wae). We njupuk data kanthi nambah lapangan layanan (server sumber, database sumber, ETL tugas ID) lan naively uncalan menyang, ngomong, Vertica.
Ayo ayo!
Bagean utama, praktis (lan rada teoritis)
Kenapa kita (lan sampeyan)
Nalika wit gedhe lan aku prasaja SQL-schik ing siji toko Rusia, kita ngapusi proses ETL alias aliran data nggunakake rong alat sing kasedhiya kanggo kita:
Pusat Daya Informatika - sistem banget nyebar, arang banget produktif, karo hardware dhewe, versi dhewe. Aku nggunakake Gusti Allah ngalang-alangi 1% saka kemampuane. Kenging punapa? Inggih, pisanan kabeh, antarmuka iki, nang endi wae saka 380s, mental meksa kita. Kapindho, piranti iki dirancang kanggo proses sing apik banget, panggunaan komponen sing murka lan trik perusahaan liyane sing penting banget. Babagan kasunyatan manawa regane, kaya swiwi Airbus AXNUMX / taun, kita ora bakal ngomong apa-apa.
Ati-ati, gambar layar bisa ngrusak wong sing umure kurang saka 30 taun
Server Integrasi SQL Server - kita nggunakake kanca iki ing aliran intra-proyek. Ya, nyatane: kita wis nggunakake SQL Server, lan mesthi ora wajar yen ora nggunakake alat ETL. Kabeh iku apik: loro antarmuka ayu, lan laporan kemajuan ... Nanging iki dudu sebabe kita seneng produk piranti lunak, oh, ora kanggo iki. Versi iku dtsx (kang XML karo kelenjar shuffled ing nyimpen) kita bisa, nanging apa gunane? Kepiye carane nggawe paket tugas sing bakal nyeret atusan tabel saka siji server menyang server liyane? Ya, apa satus, driji telunjuk sampeyan bakal tiba saka rong puluh potongan, ngeklik tombol mouse. Nanging mesthi katon luwih modis:
Kita mesthi nggoleki cara metu. Kasus malah meh teka menyang generator paket SSIS sing ditulis dhewe ...
… banjur ana pegaweyan anyar ketemu aku. Lan Apache Airflow nyusul aku.
Nalika aku ketemu metu sing gambaran proses ETL kode Python prasaja, Aku mung ora nari kanggo bungah. Iki carane data stream padha versi lan diffed, lan pouring tabel karo struktur siji saka atusan database menyang siji target dadi masalah kode Python ing siji lan setengah utawa loro 13 "layar.
Ngumpul kluster
Ayo dadi ora ngatur TK rampung, lan ora ngomong bab rampung ketok kene, kaya nginstal Airflow, database milih, Celery lan kasus liyane diterangake ing docks.
Supaya kita bisa langsung miwiti eksperimen, aku nggawe sketsa docker-compose.yml kang:
Ayo bener mundhakaken airflow: Penjadwal, Webserver. Kembang uga bakal muter ing kana kanggo ngawasi tugas Celery (amarga wis di-push menyang apache/airflow:1.10.10-python3.7, nanging kita ora keberatan)
PostgreSQL, Airflow bakal nulis informasi layanan (data jadwal, statistik eksekusi, lan liya-liyane), lan Seledri bakal menehi tandha tugas sing wis rampung;
Redis, sing bakal tumindak minangka makelar tugas kanggo Celery;
Tukang celery, sing bakal melu eksekusi langsung tugas.
Kanggo folder ./dags kita bakal nambah file kita karo gambaran saka dags. Padha bakal dijupuk ing fly, supaya ana ora perlu kanggo juggle kabeh tumpukan sawise saben wahing.
Ing sawetara panggonan, kode ing conto ora rampung ditampilake (supaya ora clutter munggah teks), nanging nang endi wae wis diowahi ing proses. Conto kode kerja lengkap bisa ditemokake ing repositori https://github.com/dm-logv/airflow-tutorial.
Ing perakitan komposisi, aku umume ngandelake gambar sing kondhang puckel / docker-aliran udara - manawa kanggo mriksa metu. Mungkin sampeyan ora butuh apa-apa ing urip sampeyan.
Kabeh setelan Airflow kasedhiya ora mung liwat airflow.cfg, nanging uga liwat variabel lingkungan (thanks kanggo pangembang), sing aku njupuk kauntungan saka angkoro.
Mesthine, iki ora siap produksi: Aku sengaja ora nyelehake deg-degan ing wadhah, aku ora ngganggu keamanan. Nanging aku nindakake minimal sing cocog kanggo para eksperimen.
Elinga yen:
Folder dag kudu bisa diakses dening panjadwal lan buruh.
Padha ditrapake kanggo kabeh perpustakaan pihak katelu - kabeh kudu diinstal ing mesin karo panjadwal lan buruh.
Inggih, saiki prasaja:
$ docker-compose up --scale worker=3
Sawise kabeh munggah, sampeyan bisa ndeleng antarmuka web:
Yen sampeyan ora ngerti apa-apa ing kabeh "dags", iki kamus singkat:
Scheduler - paman paling penting ing Airflow, sing kontrol sing robot bisa hard, lan ora wong: ngawasi jadwal, nganyari dags, miwiti tugas.
Umumé, ing versi lawas, dheweke duwe masalah karo memori (ora, ora amnesia, nanging bocor) lan parameter warisan tetep ana ing konfigurasi. run_duration - interval miwiti maneh. Nanging saiki kabeh wis apik.
DAG (aka "dag") - "grafik asiklik sing diarahake", nanging definisi kasebut bakal ngandhani sawetara wong, nanging nyatane minangka wadhah kanggo tugas sing sesambungan (ndeleng ngisor) utawa analog saka Paket ing SSIS lan Alur Kerja ing Informatica .
Saliyane dags, ana uga isih subdags, nanging kita paling kamungkinan ora bakal njaluk menyang.
DAG Run - initialized dag, kang diutus dhewe execution_date. Dagrans saka dag padha bisa ing podo karo (yen sampeyan wis nggawe tugas idempotent, mesthi).
Operator minangka potongan kode sing tanggung jawab kanggo nindakake tumindak tartamtu. Ana telung jinis operator:
tumindakkaya favorit kita PythonOperator, kang bisa nglakokaké sembarang (valid) kode Python;
transfer, sing ngirim data saka panggonan menyang panggonan, ngomong, MsSqlToHiveTransfer;
sensor ing tangan liyane, iku bakal ngidini sampeyan kanggo nanggepi utawa alon mudhun execution luwih saka dag nganti ana acara. HttpSensor bisa narik titik pungkasan kasebut, lan nalika respon sing dikarepake nunggu, miwiti transfer GoogleCloudStorageToS3Operator. Pikiran sing kepengin weruh bakal takon: "kenapa? Sawise kabeh, sampeyan bisa nindakake repetisi langsung ing operator! Banjur, supaya ora clog blumbang tugas karo operator dilereni soko tugas. Sensor diwiwiti, mriksa lan mati sadurunge nyoba sabanjure.
Task - operator ngumumaké, preduli saka jinis, lan ditempelake ing dag dipun munggah pangkat tugas.
conto tugas - nalika perencana umum mutusake yen wis wektune ngirim tugas menyang perang marang para pekerja (ing papan, yen kita nggunakake LocalExecutor utawa menyang simpul remot ing cilik saka CeleryExecutor), menehi konteks kanggo wong-wong mau (yaiku, sakumpulan variabel - paramèter eksekusi), ngembangake template printah utawa query, lan nglumpukake.
Kita ngasilake tugas
Pisanan, ayo njelasake skema umum doug kita, banjur kita bakal nyilem rincian liyane lan liyane, amarga kita nggunakake sawetara solusi non-sepele.
Dadi, ing wangun sing paling gampang, dag bakal katon kaya iki:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Ayo dipikirake:
Pisanan, kita ngimpor libs perlu lan mergo;
sql_server_ds Punika List[namedtuple[str, str]] kanthi jeneng sambungan saka Sambungan Aliran Udara lan basis data sing bakal dijupuk piring kita;
dag - woro-woro dag kita, kang kudu ing globals(), yen ora Airflow ora bakal nemokake. Doug uga kudu ngomong:
sapa jenenge orders - jeneng iki banjur bakal katon ing antarmuka web,
dheweke bakal kerja wiwit tengah wengi tanggal wolu Juli,
lan kudu mlaku, kira-kira saben 6 jam (kanggo wong lanang sing angel ing kene tinimbang timedelta() bisa ditampa cron-line 0 0 0/6 ? * * *, kanggo kurang keren - ekspresi kaya @daily);
workflow() bakal nindakake proyek utama, nanging ora saiki. Saiki, kita mung bakal mbucal konteks kita menyang log.
Lan saiki sihir sing gampang nggawe tugas:
kita mbukak liwat sumber kita;
initialize PythonOperator, sing bakal nglakokake dummy kita workflow(). Aja lali kanggo nemtokake unik (ing dag) jeneng tugas lan dasi dag dhewe. Gendéra provide_context ing siji, bakal pour bantahan tambahan menyang fungsi, kang kasebut kanthi teliti, ngumpulake nggunakake **context.
Kanggo saiki, iku kabeh. Apa sing kita entuk:
dag anyar ing antarmuka web,
siji lan setengah atus tugas sing bakal kaleksanan ing podo karo (yen Airflow, setelan Celery lan kapasitas server ngidini).
Nah, meh entuk.
Sapa sing bakal nginstal dependensi?
Kanggo nyederhanakake kabeh iki, aku ngaco docker-compose.yml pangolahan requirements.txt ing kabeh simpul.
Saiki wis ilang:
Kothak abu-abu minangka conto tugas sing diproses dening panjadwal.
Kita ngenteni sethithik, tugas-tugas kasebut ditindakake dening para pekerja:
Sing ijo, mesthine wis kasil ngrampungake karyane. Reds ora banget sukses.
Miturut cara, ora ana folder ing prod kita ./dags, ora ana sinkronisasi antarane mesin - kabeh dags dumunung ing git ing Gitlab kita, lan Gitlab CI nyebarke nganyari menyang mesin nalika gabung master.
A sethitik babagan Flower
Nalika para buruh ngoyak dot, ayo elinga alat liyane sing bisa nuduhake apa-apa - Kembang.
Kaca pisanan kanthi informasi ringkesan babagan simpul pekerja:
Kaca sing paling kuat karo tugas sing bisa ditindakake:
Kaca sing paling mboseni kanthi status broker kita:
Kaca sing paling padhang yaiku kanthi grafik status tugas lan wektu eksekusi:
We mbukak underloaded
Dadi, kabeh tugas wis rampung, sampeyan bisa nindakake sing tatu.
Lan ana akeh sing tatu - amarga siji utawa liyane. Ing kasus panggunaan Airflow sing bener, kothak kasebut nuduhake manawa data kasebut mesthi ora teka.
Sampeyan kudu nonton log lan miwiti maneh conto tugas sing tiba.
Kanthi ngeklik kothak apa wae, kita bakal weruh tumindak sing kasedhiya kanggo kita:
Sampeyan bisa njupuk lan nggawe Clear tiba. Yaiku, kita lali yen ana sing gagal, lan tugas conto sing padha bakal menyang panjadwal.
Cetha yen nindakake iki nganggo mouse karo kabeh kothak abang ora banget manungsa - iki dudu sing dikarepake saka Airflow. Alami, kita duwe senjata pemusnah massal: Browse/Task Instances
Ayo pilih kabeh bebarengan lan reset menyang nol, klik item sing bener:
Sawise ngresiki, taksi kita katon kaya iki (wis ngenteni jadwal jadwal):
Sambungan, pancingan lan variabel liyane
Wektu kanggo ndeleng DAG sabanjure, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Apa kabeh wong wis nindakake nganyari laporan? Iki dheweke maneh: ana dhaptar sumber saka ngendi kanggo njupuk data; ana dhaftar ngendi kanggo nyelehake; aja lali klakson nalika kabeh kedadeyan utawa rusak (ya, iki dudu babagan kita, ora).
Ayo goleki file maneh lan deleng barang anyar sing ora jelas:
from commons.operators import TelegramBotSendMessage - ora ana sing ngalangi kita saka nggawe operator dhewe, kang kita njupuk kauntungan saka nggawe pambungkus cilik kanggo ngirim pesen kanggo Unlocked. (Kita bakal ngomong liyane babagan operator iki ing ngisor iki);
default_args={} - dag bisa nyebarake argumen sing padha menyang kabeh operator;
to='{{ var.value.all_the_kings_men }}' - lapangan to kita ora bakal hardcoded, nanging mbosenke kui nggunakake Jinja lan variabel karo dhaftar email, kang kasebut kanthi teliti, sijine ing. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS - syarat kanggo miwiti operator. Ing kasus kita, layang kasebut bakal mabur menyang panggedhe mung yen kabeh dependensi wis rampung kasil;
tg_bot_conn_id='tg_main' - argumentasi conn_id nampa ID sambungan sing digawe ing Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - pesen ing Telegram bakal mabur mung yen ana tugas sing tiba;
task_concurrency=1 - kita nglarang Bukak simultaneous sawetara kedadean tugas siji tugas. Yen ora, kita bakal entuk peluncuran simultaneous sawetara VerticaOperator (ndeleng meja siji);
report_update >> [email, tg] - kabeh VerticaOperator konvergen ing ngirim layang lan pesen, kaya iki:
Nanging amarga operator notifikasi duwe kahanan peluncuran sing beda-beda, mung siji sing bisa digunakake. Ing Tree View, kabeh katon kurang visual:
Aku bakal ngomong sawetara tembung babagan makro lan kanca-kancane - variabel.
Macros minangka placeholder Jinja sing bisa ngganti macem-macem informasi migunani menyang argumen operator. Contone, kaya iki:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} bakal nggedhekake isi variabel konteks execution_date ing format YYYY-MM-DD: 2020-07-14. Sisih paling apik yaiku variabel konteks dipaku menyang conto tugas tartamtu (kotak ing Tree View), lan nalika diwiwiti maneh, placeholder bakal nggedhekake nilai sing padha.
Nilai sing ditugasake bisa dideleng nggunakake tombol Rendered ing saben conto tugas. Iki carane tugas ngirim layang:
Lan ing tugas ngirim pesen:
Dhaptar lengkap makro sing dibangun kanggo versi paling anyar sing kasedhiya kasedhiya ing kene: referensi macro
Kajaba iku, kanthi bantuan plugin, kita bisa ngumumake makro kita dhewe, nanging iki crita liyane.
Saliyane bab sing wis ditemtokake, kita bisa ngganti nilai-nilai variabel kita (aku wis nggunakake iki ing kode ing ndhuwur). Ayo nggawe ing Admin/Variables saperangan bab:
mung gunakake path menyang kunci sing dikarepake: {{ var.json.bot_config.bot.token }}.
Aku bakal ngomong siji tembung lan nuduhake siji gambar bab sambungan. Kabeh ana ing kene: ing kaca Admin/Connections kita nggawe sambungan, nambah login / sandhi lan paramèter sing luwih spesifik ing kana. Kaya iki:
Tembung sandhi bisa dienkripsi (luwih jero tinimbang standar), utawa sampeyan bisa ninggalake jinis sambungan (kaya aku tg_main) - Kasunyatane manawa dhaptar jinis kasebut dipasang ing model Airflow lan ora bisa ditambahi tanpa mlebu kode sumber (yen dumadakan aku ora google soko, mangga mbenerake aku), nanging ora ana sing bakal nyegah kita entuk kridit mung kanthi jeneng.
Sampeyan uga bisa nggawe sawetara sambungan kanthi jeneng sing padha: ing kasus iki, cara BaseHook.get_connection(), kang nemu kita sambungan dening jeneng, bakal menehi acak saka sawetara namesakes (bakal luwih logis kanggo nggawe Round Robin, nanging ayo kang ninggalake ing kalbu pangembang Airflow).
Variabel lan Sambungan mesthi alat kelangan, nanging iku penting kanggo ora ilang imbangan: kang bagean mili sampeyan nyimpen ing kode dhewe, lan bagean sing menehi kanggo Airflow kanggo panyimpenan. Ing tangan siji, bisa uga trep kanggo ngganti nilai kanthi cepet, contone, kothak surat, liwat UI. Ing tangan liyane, iki isih bali menyang klik mouse, saka kang kita (Aku) wanted kanggo njaluk nyisihaken saka.
Nggarap sambungan minangka salah sawijining tugas pancingan. Umumé, pancing Airflow minangka titik kanggo nyambungake menyang layanan lan perpustakaan pihak katelu. contone, JiraHook bakal mbukak klien kanggo kita sesambungan karo Jira (sampeyan bisa mindhah tugas bolak-balik), lan kanthi bantuan saka SambaHook sampeyan bisa push file lokal kanggo smb-titik.
Parsing operator khusus
Lan kita nyedhaki ndeleng carane digawe TelegramBotSendMessage
Kode commons/operators.py karo operator nyata:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Ing kene, kaya kabeh liyane ing Airflow, kabeh gampang banget:
Diturunake saka BaseOperator, sing nindakake sawetara perkara khusus Airflow (deleng wektu luang)
Bidang sing diumumake template_fields, ing ngendi Jinja bakal nggoleki makro kanggo diproses.
Disusun argumen sing tepat kanggo __init__(), nyetel standar yen perlu.
Kita uga ora lali babagan inisialisasi leluhur.
Mbukak pancing sing cocog TelegramBotHooknampa obyek klien saka iku.
Metode overridden (definisi ulang). BaseOperator.execute(), sing Airfow bakal kedutan nalika wektune mbukak operator - ing kono kita bakal ngetrapake tumindak utama, lali mlebu. (We log in, by the way, right in stdout и stderr - Aliran udara bakal nyegat kabeh, bungkus kanthi apik, decompose yen perlu.)
Ayo ndeleng apa sing kita duwe commons/hooks.py. Bagean pisanan file, kanthi pancing dhewe:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Aku malah ora ngerti apa sing arep diterangake ing kene, aku mung bakal nyathet poin penting:
We oleh warisan, mikir bab bantahan - ing paling kasus bakal dadi siji: conn_id;
Overriding cara standar: Aku winates dhewe get_conn(), ing ngendi aku njaluk paramèter sambungan kanthi jeneng lan mung entuk bagean kasebut extra (iki minangka lapangan JSON), ing ngendi aku (miturut pandhuanku dhewe!) sijine token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Aku nggawe conto kita TelegramBot, menehi token tartamtu.
Mekaten. Sampeyan bisa njaluk klien saka pancing nggunakake TelegramBotHook().clent utawa TelegramBotHook().get_conn().
Lan bagean liya saka file, ing ngendi aku nggawe microwrapper kanggo Telegram REST API, supaya ora nyeret padha python-telegram-bot kanggo siji cara sendMessage.
Cara sing bener yaiku nambahake kabeh: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ing plugin, sijine ing gudang umum, lan menehi menyang Open Source.
Nalika kita sinau kabeh iki, nganyari laporan kita bisa gagal lan ngirim pesen kesalahan ing saluran kasebut. Aku arep mriksa apa salah ...
Soko nyuwil ing doge kita! Apa dudu sing kita ngarepake? Persis!
Apa sampeyan arep pour?
Apa sampeyan rumangsa aku ora kejawab soko? Iku misale jek sing prajanji kanggo nransfer data saka SQL Server kanggo Vertica, lan banjur njupuk lan dipindhah mati topik, scoundrel!
Atrocity iki disengojo, aku mung kudu decipher sawetara terminologi kanggo sampeyan. Saiki sampeyan bisa luwih maju.
Rencana kita iki:
Dados
Nggawe tugas
Waca carane ayu kabeh
Nemtokake nomer sesi kanggo ngisi
Entuk data saka SQL Server
Sijine data menyang Vertica
Nglumpukake statistik
Dadi, kanggo ngrampungake kabeh iki, aku nggawe tambahan cilik kanggo kita docker-compose.yml:
Vertica minangka tuan rumah dwh kanthi setelan gawan paling akeh,
telung conto SQL Server,
kita isi database ing pungkasan karo sawetara data (ora katon menyang mssql_init.py!)
Kita miwiti kabeh sing apik kanthi bantuan perintah sing rada rumit tinimbang pungkasan:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Apa kita Ajaib randomizer kui, sampeyan bisa nggunakake item Data Profiling/Ad Hoc Query:
Sing utama yaiku ora nuduhake menyang analis
njlimet ing sesi ETL Aku ora bakal, kabeh ora pati penting ana: kita nggawe dhasar, ana tandha, kita mbungkus kabeh karo manajer konteks, lan saiki kita nindakake iki:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
sesi.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Wektu wis teka ngumpulake data kita saka siji lan setengah atus meja. Ayo nindakake iki kanthi bantuan garis sing ora sopan:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Kanthi bantuan saka pancing kita njaluk saka Airflow pymssql-nyambung
Ayo dadi sulih watesan ing wangun tanggal menyang request - iku bakal di buwang menyang fungsi dening mesin Cithakan.
Dipangan panyuwunan kita pandassing bakal njaluk kita DataFrame - bakal migunani kanggo kita ing mangsa ngarep.
Aku nggunakake substitusi {dt} tinimbang parameter request %s ora amarga aku Pinocchio ala, nanging amarga pandas ora bisa nangani pymssql lan mlengkung sing pungkasan params: Listsenajan dheweke kepengin banget tuple.
Elinga uga pangembang pymssql mutusaké ora ndhukung maneh, lan iku wektu kanggo pindhah metu pyodbc.
Ayo ndeleng apa Airflow isi argumen fungsi kita:
Yen ora ana data, mula ora ana gunane kanggo nerusake. Nanging uga aneh kanggo nimbang ngisi sukses. Nanging iki ora salah. A-ah-ah, apa sing kudu ditindakake?! Lan iki apa:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException ngandhani Airflow sing ora ana kasalahan, nanging kita skip tugas. Antarmuka ora bakal duwe kothak ijo utawa abang, nanging jambon.
ID sesi banjir kita (bakal beda kanggo saben tugas),
A hash saka sumber lan supaya ID - supaya ing database final (ngendi kabeh wis diwutahake menyang siji meja) kita duwe ID urutan unik.
Langkah penultimate tetep: tuangake kabeh menyang Vertica. Lan, anehe, salah sawijining cara sing paling apik lan efisien kanggo nindakake iki yaiku liwat CSV!
- Inggih, - ngandika mouse cilik, - ora iku, saiki
Apa sampeyan yakin yen aku iki kewan sing paling nggegirisi ing alas?
Julia Donaldson, The Gruffalo
Aku mikir yen kolega lan aku duwe kompetisi: sing bakal cepet nggawe lan miwiti proses ETL saka ngeruk: padha karo SSIS lan mouse lan aku karo Airflow ... Banjur kita uga bakal mbandhingaké ease saka pangopènan ... Wah, aku mikir sampeyan bakal setuju yen aku bakal ngalahake dheweke ing kabeh ngarep!
Yen luwih serius, Apache Airflow - kanthi njlentrehake proses ing wangun kode program - nindakake tugasku akeh luwih nyaman lan nyenengake.
Ekstensibilitas tanpa wates, ing babagan plug-in lan predisposisi kanggo skalabilitas, menehi kesempatan kanggo nggunakake Aliran Udara ing meh kabeh wilayah: sanajan ing siklus lengkap ngumpulake, nyiapake lan ngolah data, sanajan ngluncurake roket (menyang Mars, saka kursus).
Bagian pungkasan, referensi lan informasi
Garu sing wis diklumpukake kanggo sampeyan
start_date. Ya, iki wis dadi meme lokal. Liwat argumen utama Doug start_date kabeh lulus. Sedhela, yen sampeyan nemtokake ing start_date tanggal saiki, lan schedule_interval - siji dina, banjur DAG bakal miwiti sesuk ora sadurungé.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Lan ora ana masalah maneh.
Ana kesalahan runtime liyane sing ana gandhengane: Task is missing the start_date parameter, sing paling asring nuduhake yen sampeyan kelalen ikatan karo operator dag.
Kabeh ing siji mesin. Ya, lan pangkalan (Airflow dhewe lan lapisan kita), lan server web, lan panjadwal, lan buruh. Lan malah bisa. Nanging suwe-suwe, jumlah tugas kanggo layanan tambah akeh, lan nalika PostgreSQL wiwit nanggapi indeks ing 20 s tinimbang 5 ms, kita njupuk lan digawa.
LocalExecutor. Ya, kita isih lungguh ing kono, lan kita wis teka ing pinggir jurang. LocalExecutor wis cukup kanggo kita supaya adoh, nanging saiki wektu kanggo nggedhekake karo paling siji buruh, lan kita kudu bisa hard kanggo pindhah menyang CeleryExecutor. Lan amarga kasunyatane sampeyan bisa nggarap siji mesin, ora ana sing ngalangi sampeyan nggunakake Celery sanajan ing server, sing "mesthi wae, ora bakal dadi produksi, jujur!"
Ora dienggo piranti dibangun ing:
sambungan kanggo nyimpen kredensial layanan,
SLA Kantun kanggo nanggapi tugas sing ora rampung ing wektu,
xcom kanggo ijol-ijolan metadata (aku ngomong metadata!) antarane tugas dag.
penyalahgunaan mail. Nah, apa sing bisa dakkandhakake? Tandha wis disetel kanggo kabeh pengulangan tugas sing tiba. Saiki Gmail karyaku duwe> 90k email saka Airflow, lan moncong email web ora gelem njupuk lan mbusak luwih saka 100 sekaligus.
Supaya kita bisa kerja luwih akeh kanthi sirah lan ora nganggo tangan, Airflow wis nyiapake kanggo kita:
REST API - dheweke isih nduweni status Eksperimental, sing ora ngalangi dheweke kerja. Karo, sampeyan ora mung bisa njaluk informasi bab dags lan tugas, nanging uga mungkasi / miwiti dag, nggawe DAG Run utawa blumbang.
CLI - akeh alat kasedhiya liwat baris printah sing ora mung nyaman kanggo nggunakake liwat WebUI, nanging umume absen. Tuladhane:
backfill dibutuhake kanggo miwiti maneh conto tugas.
Contone, analis teka lan ujar: "Lan sampeyan, kanca, duwe omong kosong ing data saka 1 nganti 13 Januari! Ndandani, ndandani, ndandani, ndandani!" Lan sampeyan kaya hob:
Layanan dhasar: initdb, resetdb, upgradedb, checkdb.
run, sing ngijini sampeyan kanggo mbukak siji tugas Kayata, lan malah ngetung ing kabeh dependensi. Kajaba iku, sampeyan bisa mbukak liwat LocalExecutor, sanajan sampeyan duwe klompok Celery.
Apa meh padha test, mung uga ing basa nulis apa-apa.
connections ngidini nggawe massa sambungan saka cangkang.
API Python - cara sesambungan sing rada hardcore, sing dimaksudake kanggo plugins, lan ora swarming karo tangan cilik. Nanging sapa sing ngalangi kita lunga /home/airflow/dags, mlayu ipython lan miwiti kekacoan watara? Sampeyan bisa, contone, ngekspor kabeh sambungan karo kode ing ngisor iki:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Nyambung menyang metadatabase Airflow. Aku ora nyaranake nulis, nanging entuk status tugas kanggo macem-macem metrik tartamtu bisa luwih cepet lan luwih gampang tinimbang nggunakake API apa wae.
Ayo dadi ngomong sing ora kabeh tugas kita idempoten, nanging kadhangkala bisa tiba, lan iki normal. Nanging sawetara pamblokiran wis curiga, lan kudu dipriksa.
Waspada SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
referensi
Lan mesthi, sepuluh pranala pisanan saka penerbitan Google minangka isi folder Airflow saka tetengerku.