Hai, saya Dmitry Logvinenko - Jurutera Data Jabatan Analitis kumpulan syarikat Vezet.
Saya akan memberitahu anda tentang alat yang hebat untuk membangunkan proses ETL - Apache Airflow. Tetapi Aliran Udara sangat serba boleh dan pelbagai rupa sehingga anda harus melihatnya dengan lebih dekat walaupun anda tidak terlibat dalam aliran data, tetapi perlu melancarkan sebarang proses secara berkala dan memantau pelaksanaannya.
Dan ya, saya bukan sahaja akan memberitahu, tetapi juga menunjukkan: program ini mempunyai banyak kod, tangkapan skrin dan cadangan.
Perkara yang biasa anda lihat apabila anda google perkataan Aliran Udara / Wikimedia Commons
- hanya lebih baik, dan ia dibuat untuk tujuan yang sama sekali berbeza, iaitu (seperti yang ditulis sebelum kata):
menjalankan dan memantau tugas pada bilangan mesin yang tidak terhad (sebanyak Saderi / Kubernetes dan hati nurani anda akan membenarkan anda)
dengan penjanaan aliran kerja dinamik daripada kod Python yang sangat mudah untuk ditulis dan difahami
dan keupayaan untuk menyambung mana-mana pangkalan data dan API antara satu sama lain menggunakan kedua-dua komponen siap pakai dan pemalam buatan sendiri (yang sangat mudah).
Kami menggunakan Apache Airflow seperti ini:
kami mengumpul data daripada pelbagai sumber (banyak contoh SQL Server dan PostgreSQL, pelbagai API dengan metrik aplikasi, malah 1C) dalam DWH dan ODS (kami mempunyai Vertica dan Clickhouse).
betapa majunya cron, yang memulakan proses penyatuan data pada ODS, dan juga memantau penyelenggaraannya.
Sehingga baru-baru ini, keperluan kami dilindungi oleh satu pelayan kecil dengan 32 teras dan 50 GB RAM. Dalam Aliran Udara, ini berfungsi:
lebih 200 hari (sebenarnya aliran kerja, di mana kami mengisi tugasan),
dalam setiap secara purata 70 tugasan,
kebaikan ini bermula (juga secara purata) sekali sejam.
Dan tentang bagaimana kami berkembang, saya akan menulis di bawah, tetapi sekarang mari kita tentukan masalah ΓΌber yang akan kita selesaikan:
Terdapat tiga Pelayan SQL sumber, masing-masing dengan 50 pangkalan data - contoh satu projek, masing-masing, mereka mempunyai struktur yang sama (hampir di mana-mana, mua-ha-ha), yang bermaksud bahawa setiap satu mempunyai jadual Pesanan (mujurlah, jadual dengan itu nama boleh ditolak ke dalam mana-mana perniagaan). Kami mengambil data dengan menambahkan medan perkhidmatan (pelayan sumber, pangkalan data sumber, ID tugas ETL) dan membuangnya secara naif, katakan, Vertica.
Mari kita pergi!
Bahagian utama, praktikal (dan sedikit teori)
Mengapa kami (dan anda)
Apabila pokok-pokok besar dan saya sederhana SQL-schik dalam satu runcit Rusia, kami menipu proses ETL aka aliran data menggunakan dua alat yang tersedia untuk kami:
Pusat Kuasa Informatica - sistem yang sangat meluas, sangat produktif, dengan perkakasannya sendiri, versinya sendiri. Saya menggunakan Tuhan melarang 1% daripada keupayaannya. kenapa? Baiklah, pertama sekali, antara muka ini, di suatu tempat dari tahun 380-an, secara mental memberi tekanan kepada kami. Kedua, alat ini direka untuk proses yang sangat mewah, penggunaan semula komponen yang marah dan helah-helah perusahaan yang sangat penting. Mengenai kosnya, seperti sayap Airbus AXNUMX / tahun, kami tidak akan mengatakan apa-apa.
Berhati-hati, tangkapan skrin boleh mencederakan orang di bawah umur 30 tahun sedikit
Pelayan Integrasi Pelayan SQL - kami menggunakan rakan seperjuangan ini dalam aliran dalam projek kami. Sebenarnya: kami sudah menggunakan SQL Server, dan entah bagaimana tidak munasabah untuk tidak menggunakan alatan ETLnya. Segala-galanya di dalamnya adalah baik: kedua-dua antara muka adalah cantik, dan laporan kemajuan ... Tetapi ini bukan sebab kami menyukai produk perisian, oh, bukan untuk ini. Versi itu dtsx (iaitu XML dengan nod yang dikocok semasa simpan) kita boleh, tetapi apa gunanya? Bagaimana pula dengan membuat pakej tugas yang akan menyeret beratus-ratus jadual dari satu pelayan ke pelayan yang lain? Ya, apa yang seratus, jari telunjuk anda akan jatuh dari dua puluh keping, klik pada butang tetikus. Tetapi ia pasti kelihatan lebih bergaya:
Kami sudah tentu mencari jalan keluar. Malah kes hampir datang ke penjana pakej SSIS yang ditulis sendiri ...
β¦dan kemudian pekerjaan baharu menemui saya. Dan Apache Airflow mengatasi saya di atasnya.
Apabila saya mengetahui bahawa perihalan proses ETL adalah kod Python yang mudah, saya tidak menari dengan gembira. Beginilah cara aliran data diversi dan dibezakan, dan menuangkan jadual dengan struktur tunggal daripada ratusan pangkalan data ke dalam satu sasaran menjadi masalah kod Python dalam satu setengah atau dua skrin 13 β.
Memasang kluster
Jangan kita susun tadika sepenuhnya, dan jangan bercakap tentang perkara yang jelas di sini, seperti memasang Aliran Udara, pangkalan data pilihan anda, Saderi dan kes lain yang diterangkan dalam dok.
Supaya kita boleh segera memulakan eksperimen, saya melakar docker-compose.yml di mana:
Mari kita naikkan sebenarnya Aliran udara: Penjadual, Pelayan Web. Bunga juga akan berputar di sana untuk memantau tugasan Saderi (kerana ia telah dimasukkan ke dalam apache/airflow:1.10.10-python3.7, tetapi kami tidak keberatan)
PostgreSQL, di mana Airflow akan menulis maklumat perkhidmatannya (data penjadual, statistik pelaksanaan, dsb.), dan Celery akan menandakan tugasan yang telah selesai;
Redis, yang akan bertindak sebagai broker tugas untuk Celery;
Pekerja saderi, yang akan terlibat dalam pelaksanaan tugas secara langsung.
Ke folder ./dags kami akan menambah fail kami dengan penerangan dags. Mereka akan diambil dengan cepat, jadi tidak perlu menyulap keseluruhan timbunan selepas setiap bersin.
Di sesetengah tempat, kod dalam contoh tidak ditunjukkan sepenuhnya (supaya tidak mengacaukan teks), tetapi di suatu tempat ia diubah suai dalam proses. Contoh kod kerja lengkap boleh didapati dalam repositori https://github.com/dm-logv/airflow-tutorial.
Dalam pemasangan komposisi, saya banyak bergantung pada imej yang terkenal aliran udara puckel/docker - pastikan anda menyemaknya. Mungkin anda tidak memerlukan apa-apa lagi dalam hidup anda.
Semua tetapan Aliran Udara tersedia bukan sahaja melalui airflow.cfg, tetapi juga melalui pembolehubah persekitaran (terima kasih kepada pembangun), yang saya ambil kesempatan secara berniat jahat.
Sememangnya, ia tidak sedia untuk pengeluaran: Saya sengaja tidak meletakkan degupan jantung pada bekas, saya tidak peduli dengan keselamatan. Tetapi saya melakukan minimum yang sesuai untuk penguji kami.
Ambil perhatian bahawa:
Folder dag mesti boleh diakses oleh penjadual dan pekerja.
Perkara yang sama berlaku untuk semua perpustakaan pihak ketiga - semuanya mesti dipasang pada mesin dengan penjadual dan pekerja.
Nah, sekarang mudah:
$ docker-compose up --scale worker=3
Selepas semuanya meningkat, anda boleh melihat antara muka web:
Jika anda tidak memahami apa-apa dalam semua "dags" ini, maka berikut ialah kamus pendek:
Berjadual - bapa saudara yang paling penting dalam Aliran Udara, yang mengawal bahawa robot bekerja keras, dan bukan seseorang: memantau jadual, mengemas kini hari, melancarkan tugas.
Secara umum, dalam versi yang lebih lama, dia menghadapi masalah dengan ingatan (tidak, bukan amnesia, tetapi bocor) dan parameter warisan masih kekal dalam konfigurasi run_duration β selang mula semula. Tetapi kini semuanya baik-baik saja.
DAG (aka "dag") - "graf asiklik terarah", tetapi definisi sedemikian akan memberitahu beberapa orang, tetapi sebenarnya ia adalah bekas untuk tugas yang berinteraksi antara satu sama lain (lihat di bawah) atau analog Pakej dalam SSIS dan Aliran Kerja dalam Informatica .
Sebagai tambahan kepada dag, mungkin masih terdapat subdag, tetapi kemungkinan besar kita tidak akan mendapatkannya.
Larian DAG - dag yang dimulakan, yang diperuntukkan sendiri execution_date. Dagrans daripada dag yang sama boleh berfungsi secara selari (jika anda membuat tugas anda menjadi idempotent, sudah tentu).
operator ialah kepingan kod yang bertanggungjawab untuk melakukan tindakan tertentu. Terdapat tiga jenis operator:
tindakanseperti kegemaran kami PythonOperator, yang boleh melaksanakan mana-mana kod Python (sah);
memindahkan, yang mengangkut data dari satu tempat ke satu tempat, katakan, MsSqlToHiveTransfer;
sensor sebaliknya, ia akan membolehkan anda bertindak balas atau memperlahankan pelaksanaan dag selanjutnya sehingga peristiwa berlaku. HttpSensor boleh menarik titik akhir yang ditentukan, dan apabila respons yang diingini sedang menunggu, mulakan pemindahan GoogleCloudStorageToS3Operator. Fikiran yang ingin tahu akan bertanya: βkenapa? Lagipun, anda boleh melakukan pengulangan terus dalam operator!β Dan kemudian, untuk tidak menyumbat kumpulan tugas dengan pengendali yang digantung. Penderia bermula, menyemak dan mati sebelum percubaan seterusnya.
Petugas - pengendali yang diisytiharkan, tanpa mengira jenis, dan dilampirkan pada dag dinaikkan pangkat kepada tugas.
contoh tugas - apabila perancang am memutuskan bahawa sudah tiba masanya untuk menghantar tugas ke dalam pertempuran pada pekerja-pekerja (tepat di tempat, jika kita menggunakan LocalExecutor atau ke nod jauh dalam kes CeleryExecutor), ia memberikan konteks kepada mereka (iaitu, satu set pembolehubah - parameter pelaksanaan), mengembangkan templat perintah atau pertanyaan dan mengumpulkannya.
Kami menjana tugas
Mula-mula, mari kita gariskan skema umum doug kami, dan kemudian kami akan menyelami butiran lebih banyak lagi, kerana kami menggunakan beberapa penyelesaian yang tidak remeh.
Jadi, dalam bentuk yang paling mudah, dag seperti ini akan kelihatan seperti ini:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Mari kita fikirkan:
Pertama, kami mengimport lib yang diperlukan dan sesuatu yang lain;
sql_server_ds - Adakah List[namedtuple[str, str]] dengan nama sambungan daripada Sambungan Aliran Udara dan pangkalan data dari mana kami akan mengambil plat kami;
dag - pengumuman hari kami, yang semestinya ada globals(), jika tidak Aliran Udara tidak akan menemuinya. Doug juga perlu berkata:
siapa nama dia orders - nama ini kemudiannya akan muncul dalam antara muka web,
bahawa dia akan bekerja dari tengah malam pada lapan Julai,
dan ia sepatutnya berjalan, kira-kira setiap 6 jam (untuk lelaki yang lasak di sini dan bukannya timedelta() boleh diterima cron-baris 0 0 0/6 ? * * *, untuk yang kurang keren - ungkapan seperti @daily);
workflow() akan melakukan kerja utama, tetapi bukan sekarang. Buat masa ini, kami hanya akan membuang konteks kami ke dalam log.
Dan kini keajaiban mudah untuk mencipta tugas:
kami menjalankan sumber kami;
mulakan PythonOperator, yang akan melaksanakan dummy kami workflow(). Jangan lupa untuk menentukan nama unik (dalam dag) tugas dan mengikat dag itu sendiri. Bendera provide_context seterusnya, akan menuangkan hujah tambahan ke dalam fungsi, yang akan kami kumpulkan dengan teliti menggunakan **context.
Buat masa ini, itu sahaja. Apa yang kami dapat:
dag baharu dalam antara muka web,
satu setengah ratus tugasan yang akan dilaksanakan secara selari (jika aliran Udara, tetapan Saderi dan kapasiti pelayan membenarkannya).
Nah, hampir mendapatnya.
Siapa yang akan memasang kebergantungan?
Untuk memudahkan semua perkara ini, saya kacau docker-compose.yml pemprosesan requirements.txt pada semua nod.
Kini ia hilang:
Petak kelabu ialah contoh tugas yang diproses oleh penjadual.
Kami tunggu sebentar, tugasan diambil oleh pekerja:
Yang hijau sudah tentu berjaya menyiapkan kerja mereka. Merah tidak begitu berjaya.
By the way, tiada folder pada prod kami ./dags, tiada penyegerakan antara mesin - semua dags terletak git pada Gitlab kami dan Gitlab CI mengedarkan kemas kini kepada mesin apabila bergabung master.
Sedikit tentang Bunga
Semasa pekerja membelasah puting kita, mari kita ingat alat lain yang boleh menunjukkan sesuatu kepada kita - Bunga.
Halaman pertama dengan maklumat ringkasan mengenai nod pekerja:
Halaman yang paling sengit dengan tugasan yang berfungsi:
Halaman paling membosankan dengan status broker kami:
Halaman paling terang adalah dengan graf status tugas dan masa pelaksanaannya:
Kami memuatkan yang kurang dimuatkan
Jadi, semua tugas telah berjaya, anda boleh membawa pergi yang cedera.
Dan terdapat ramai yang cedera - untuk satu sebab atau yang lain. Dalam kes penggunaan Aliran Udara yang betul, petak ini menunjukkan bahawa data pasti tidak sampai.
Anda perlu melihat log dan mulakan semula contoh tugas yang telah jatuh.
Dengan mengklik mana-mana petak, kami akan melihat tindakan yang tersedia untuk kami:
Anda boleh mengambil dan membuat Clear yang jatuh. Iaitu, kita lupa bahawa sesuatu telah gagal di sana, dan tugas contoh yang sama akan pergi ke penjadual.
Adalah jelas bahawa melakukan ini dengan tetikus dengan semua petak merah adalah tidak berperikemanusiaan - ini bukan apa yang kita harapkan daripada Aliran Udara. Sememangnya, kita mempunyai senjata pemusnah besar-besaran: Browse/Task Instances
Mari pilih semuanya sekaligus dan tetapkan semula kepada sifar, klik item yang betul:
Selepas pembersihan, teksi kami kelihatan seperti ini (mereka sudah menunggu penjadual untuk menjadualkannya):
Sambungan, cangkuk dan pembolehubah lain
Sudah tiba masanya untuk melihat DAG seterusnya, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ΠΠΎΡΠΏΠΎΠ΄Π° Ρ ΠΎΡΠΎΡΠΈΠ΅, ΠΎΡΡΠ΅ΡΡ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ΠΠ°ΡΠ°Ρ, ΠΏΡΠΎΡΡΠΏΠ°ΠΉΡΡ, ΠΌΡ {{ dag.dag_id }} ΡΡΠΎΠ½ΠΈΠ»ΠΈ
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Adakah semua orang pernah melakukan kemas kini laporan? Ini dia lagi: terdapat senarai sumber dari mana untuk mendapatkan data; terdapat senarai di mana untuk meletakkan; jangan lupa membunyikan hon apabila segala-galanya berlaku atau rosak (baik, ini bukan tentang kita, tidak).
Mari kita semak semula fail itu dan lihat perkara baharu yang tidak jelas:
from commons.operators import TelegramBotSendMessage - tiada apa yang menghalang kami daripada membuat pengendali kami sendiri, yang kami manfaatkan dengan membuat pembungkus kecil untuk menghantar mesej kepada Disekat. (Kami akan bercakap lebih lanjut mengenai operator ini di bawah);
default_args={} - dag boleh mengedarkan hujah yang sama kepada semua pengendalinya;
to='{{ var.value.all_the_kings_men }}' - padang to kami tidak akan mempunyai kod keras, tetapi dijana secara dinamik menggunakan Jinja dan pembolehubah dengan senarai e-mel, yang saya masukkan dengan teliti Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS β syarat untuk memulakan operator. Dalam kes kami, surat itu akan dihantar kepada bos hanya jika semua tanggungan telah berjaya berjaya;
tg_bot_conn_id='tg_main' - hujah conn_id terima ID sambungan yang kami buat Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - mesej dalam Telegram akan terbang hanya jika terdapat tugas yang gagal;
task_concurrency=1 - kami melarang pelancaran serentak beberapa contoh tugas bagi satu tugas. Jika tidak, kami akan mendapat pelancaran serentak beberapa VerticaOperator (melihat satu meja);
report_update >> [email, tg] - semuanya VerticaOperator berkumpul dalam menghantar surat dan mesej, seperti ini:
Tetapi oleh kerana pengendali pemberitahuan mempunyai syarat pelancaran yang berbeza, hanya satu yang akan berfungsi. Dalam Tree View, semuanya kelihatan kurang visual:
Saya akan mengatakan beberapa perkataan tentang makro dan rakan-rakan mereka - pembolehubah.
Makro ialah ruang letak Jinja yang boleh menggantikan pelbagai maklumat berguna ke dalam hujah operator. Sebagai contoh, seperti ini:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} akan berkembang kepada kandungan pembolehubah konteks execution_date dalam format YYYY-MM-DD: 2020-07-14. Bahagian yang terbaik ialah pembolehubah konteks dipaku pada contoh tugas tertentu (segi empat dalam Paparan Pokok), dan apabila dimulakan semula, ruang letak akan berkembang kepada nilai yang sama.
Nilai yang diberikan boleh dilihat menggunakan butang Rendered pada setiap contoh tugas. Beginilah tugas menghantar surat:
Dan seterusnya pada tugas dengan menghantar mesej:
Senarai lengkap makro terbina dalam untuk versi terkini tersedia tersedia di sini: rujukan makro
Selain itu, dengan bantuan pemalam, kami boleh mengisytiharkan makro kami sendiri, tetapi itu cerita lain.
Sebagai tambahan kepada perkara yang telah ditetapkan, kita boleh menggantikan nilai pembolehubah kami (saya sudah menggunakan ini dalam kod di atas). Jom buat dalam Admin/Variables beberapa perkara:
hanya gunakan laluan ke kunci yang dikehendaki: {{ var.json.bot_config.bot.token }}.
Saya benar-benar akan menyebut satu perkataan dan menunjukkan satu tangkapan skrin tentang sambungan. Segala-galanya adalah asas di sini: pada halaman Admin/Connections kami membuat sambungan, menambah log masuk / kata laluan kami dan lebih banyak parameter khusus di sana. seperti ini:
Kata laluan boleh disulitkan (lebih teliti daripada lalai), atau anda boleh meninggalkan jenis sambungan (seperti yang saya lakukan untuk tg_main) - hakikatnya ialah senarai jenis dirangkai dalam model Aliran Udara dan tidak boleh dikembangkan tanpa masuk ke dalam kod sumber (jika tiba-tiba saya tidak google sesuatu, sila betulkan saya), tetapi tiada apa yang akan menghalang kami daripada mendapatkan kredit hanya dengan nama.
Anda juga boleh membuat beberapa sambungan dengan nama yang sama: dalam kes ini, kaedah BaseHook.get_connection(), yang memberi kita sambungan dengan nama, akan memberi rawak daripada beberapa nama (ia akan menjadi lebih logik untuk membuat Round Robin, tetapi mari kita serahkan pada hati nurani pemaju Airflow).
Pembolehubah dan Sambungan sememangnya alat yang hebat, tetapi penting untuk tidak kehilangan keseimbangan: bahagian aliran anda yang mana anda simpan dalam kod itu sendiri, dan bahagian mana yang anda berikan kepada Aliran Udara untuk simpanan. Di satu pihak, ia boleh menjadi mudah untuk menukar nilai dengan cepat, contohnya, kotak mel, melalui UI. Sebaliknya, ini masih merupakan pengembalian kepada klik tetikus, yang mana kami (saya) ingin singkirkan.
Bekerja dengan sambungan adalah salah satu tugas cangkuk. Secara umum, cangkuk Aliran Udara ialah titik untuk menyambungkannya ke perkhidmatan dan perpustakaan pihak ketiga. Cth, JiraHook akan membuka pelanggan untuk kami berinteraksi dengan Jira (anda boleh mengalihkan tugas ke sana ke mari), dan dengan bantuan SambaHook anda boleh menolak fail tempatan ke smb-titik.
Menghuraikan pengendali tersuai
Dan kami hampir melihat bagaimana ia dibuat TelegramBotSendMessage
Kod commons/operators.py dengan pengendali sebenar:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Di sini, seperti semua yang lain dalam Aliran Udara, semuanya sangat mudah:
Diwarisi daripada BaseOperator, yang melaksanakan beberapa perkara khusus Aliran Udara (lihat masa lapang anda)
Medan yang diisytiharkan template_fields, di mana Jinja akan mencari makro untuk diproses.
Disusun hujah yang tepat untuk __init__(), tetapkan lalai jika perlu.
Kami juga tidak lupa tentang permulaan nenek moyang.
Membuka cangkuk yang sepadan TelegramBotHookmenerima objek pelanggan daripadanya.
Kaedah ganti (ditakrifkan semula). BaseOperator.execute(), yang Airfow akan berkedut apabila tiba masanya untuk melancarkan pengendali - di dalamnya kami akan melaksanakan tindakan utama, terlupa untuk log masuk. (Kami log masuk, dengan cara itu, terus masuk stdout ΠΈ stderr - Aliran udara akan memintas segala-galanya, membalutnya dengan cantik, menguraikannya di mana perlu.)
Mari lihat apa yang kita ada commons/hooks.py. Bahagian pertama fail, dengan cangkuk itu sendiri:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Saya tidak tahu apa yang hendak dijelaskan di sini, saya hanya akan ambil perhatian perkara penting:
Kami mewarisi, fikirkan tentang hujah - dalam kebanyakan kes ia akan menjadi satu: conn_id;
Mengatasi kaedah standard: Saya mengehadkan diri saya get_conn(), di mana saya mendapat parameter sambungan mengikut nama dan hanya mendapatkan bahagian itu extra (ini adalah medan JSON), di mana saya (mengikut arahan saya sendiri!) meletakkan token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Saya mencipta contoh kami TelegramBot, memberikannya token khusus.
Itu sahaja. Anda boleh mendapatkan pelanggan dari cangkuk menggunakan TelegramBotHook().clent atau TelegramBotHook().get_conn().
Dan bahagian kedua fail, di mana saya membuat pembungkus mikro untuk API REST Telegram, supaya tidak menyeret perkara yang sama python-telegram-bot untuk satu kaedah sendMessage.
Cara yang betul ialah menambah semuanya: TelegramBotSendMessage, TelegramBotHook, TelegramBot - dalam pemalam, masukkan ke dalam repositori awam, dan berikan kepada Sumber Terbuka.
Semasa kami mengkaji semua ini, kemas kini laporan kami berjaya gagal dan menghantar mesej ralat kepada saya dalam saluran. Saya akan menyemak sama ada ia salah...
Sesuatu telah berlaku pada anjing kami! Bukankah itu yang kita jangkakan? Tepat sekali!
Adakah anda akan mencurahkan?
Adakah anda rasa saya terlepas sesuatu? Nampaknya dia berjanji untuk memindahkan data dari SQL Server ke Vertica, dan kemudian dia mengambilnya dan mengalihkan topik, bajingan!
Kekejaman ini disengajakan, saya hanya perlu menguraikan beberapa istilah untuk anda. Sekarang anda boleh pergi lebih jauh.
Rancangan kami adalah ini:
Do dag
Hasilkan tugasan
Lihat betapa indahnya semuanya
Berikan nombor sesi untuk diisi
Dapatkan data daripada SQL Server
Masukkan data ke dalam Vertica
Kumpul statistik
Oleh itu, untuk melaksanakan semua ini dan berjalan, saya membuat tambahan kecil kepada kami docker-compose.yml:
Vertica sebagai hos dwh dengan tetapan lalai yang paling banyak,
tiga contoh SQL Server,
kami mengisi pangkalan data dalam yang terakhir dengan beberapa data (sekali-kali jangan lihat mssql_init.py!)
Kami melancarkan semua kebaikan dengan bantuan arahan yang lebih rumit daripada kali terakhir:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Apa yang dihasilkan oleh pengacak ajaib kami, anda boleh menggunakan item tersebut Data Profiling/Ad Hoc Query:
Perkara utama bukanlah untuk menunjukkannya kepada penganalisis
menghuraikan sesi ETL Saya tidak akan, semuanya remeh di sana: kami membuat asas, terdapat tanda di dalamnya, kami membungkus segala-galanya dengan pengurus konteks, dan kini kami melakukan ini:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Masanya telah tiba kumpul data kami dari satu setengah ratus meja kami. Mari lakukan ini dengan bantuan baris yang sangat bersahaja:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Dengan bantuan cangkuk yang kami dapat dari Aliran Udara pymssql-bersambung
Mari kita gantikan sekatan dalam bentuk tarikh ke dalam permintaan - ia akan dimasukkan ke dalam fungsi oleh enjin templat.
Memenuhi permintaan kami pandassiapa yang akan dapatkan kita DataFrame - ia akan berguna kepada kita pada masa hadapan.
Saya menggunakan penggantian {dt} bukannya parameter permintaan %s bukan kerana saya Pinocchio yang jahat, tetapi kerana pandas tak boleh handle pymssql dan tergelincir yang terakhir params: Listwalaupun dia benar-benar mahu tuple.
Juga ambil perhatian bahawa pemaju pymssql memutuskan untuk tidak menyokongnya lagi, dan sudah tiba masanya untuk keluar pyodbc.
Mari lihat apakah Airflow mengisi hujah fungsi kami dengan:
Jika tiada data, maka tiada gunanya meneruskan. Tetapi ia juga pelik untuk menganggap pengisian berjaya. Tetapi ini bukan satu kesilapan. A-ah-ah, apa nak buat?! Dan inilah yang:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException akan memberitahu Airflow bahawa tiada ralat, tetapi kami melangkau tugas itu. Antara muka tidak akan mempunyai segi empat sama hijau atau merah, tetapi merah jambu.
ID sesi banjir kami (ia akan berbeza untuk setiap tugas),
Cincang dari sumber dan ID pesanan - supaya dalam pangkalan data akhir (di mana segala-galanya dituangkan ke dalam satu jadual) kami mempunyai ID pesanan yang unik.
Langkah kedua terakhir kekal: tuangkan segala-galanya ke Vertica. Dan, anehnya, salah satu cara yang paling hebat dan berkesan untuk melakukan ini ialah melalui CSV!
Pada jualan, kami mencipta plat sasaran secara manual. Di sini saya membenarkan diri saya menggunakan mesin kecil:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
saya menggunakan VerticaOperator() Saya mencipta skema pangkalan data dan jadual (jika ia belum wujud, sudah tentu). Perkara utama ialah mengatur kebergantungan dengan betul:
- Nah, - kata tikus kecil, - bukan, sekarang
Adakah anda yakin bahawa saya adalah haiwan yang paling dahsyat di dalam hutan?
Julia Donaldson, The Gruffalo
Saya fikir jika rakan sekerja saya dan saya mempunyai persaingan: siapa yang akan cepat mencipta dan melancarkan proses ETL dari awal: mereka dengan SSIS dan tetikus mereka dan saya dengan Aliran Udara ... Dan kemudian kami juga akan membandingkan kemudahan penyelenggaraan ... Wah, saya rasa anda akan bersetuju bahawa saya akan mengalahkan mereka di semua bahagian!
Jika lebih serius, maka Apache Airflow - dengan menerangkan proses dalam bentuk kod program - melakukan tugas saya banyak lebih selesa dan menyeronokkan.
Kebolehlanjutan tanpa hadnya, baik dari segi pemalam dan kecenderungan kepada skalabiliti, memberi anda peluang untuk menggunakan Aliran Udara di hampir mana-mana kawasan: walaupun dalam kitaran penuh mengumpul, menyediakan dan memproses data, walaupun dalam melancarkan roket (ke Marikh, daripada kursus).
Bahagian akhir, rujukan dan maklumat
Garu yang kami kumpulkan untuk anda
start_date. Ya, ini sudah menjadi meme tempatan. Melalui hujah utama Doug start_date semua lulus. Secara ringkas, jika anda nyatakan dalam start_date tarikh semasa, dan schedule_interval - satu hari, maka DAG akan bermula esok tidak lebih awal.
start_date = datetime(2020, 7, 7, 0, 1, 2)
Dan tiada lagi masalah.
Terdapat satu lagi ralat masa jalan yang dikaitkan dengannya: Task is missing the start_date parameter, yang paling kerap menunjukkan bahawa anda terlupa untuk mengikat operator dag.
Semua pada satu mesin. Ya, dan pangkalan (Aliran Udara sendiri dan salutan kami), dan pelayan web, dan penjadual, dan pekerja. Dan ia juga berjaya. Tetapi dari masa ke masa, bilangan tugas untuk perkhidmatan meningkat, dan apabila PostgreSQL mula bertindak balas kepada indeks dalam 20 s dan bukannya 5 ms, kami mengambilnya dan membawanya pergi.
LocalExecutor. Ya, kami masih duduk di atasnya, dan kami sudah sampai ke tepi jurang. LocalExecutor telah mencukupi untuk kami setakat ini, tetapi kini tiba masanya untuk berkembang dengan sekurang-kurangnya seorang pekerja, dan kami perlu bekerja keras untuk berpindah ke CeleryExecutor. Dan memandangkan fakta bahawa anda boleh bekerja dengannya pada satu mesin, tiada apa yang menghalang anda daripada menggunakan Saderi walaupun pada pelayan, yang "sudah tentu, tidak akan pernah dikeluarkan, secara jujur!"
Tidak guna alatan terbina dalam:
Sambungan untuk menyimpan bukti kelayakan perkhidmatan,
SLA Rindu untuk bertindak balas terhadap tugas yang tidak berjaya pada masanya,
xcom untuk pertukaran metadata (saya berkata metadata!) antara tugasan dag.
penyalahgunaan mel. Nah, apa yang boleh saya katakan? Makluman telah disediakan untuk semua pengulangan tugas yang gagal. Sekarang Gmail kerja saya mempunyai >90k e-mel daripada Airflow dan muncung mel web enggan mengambil dan memadam lebih daripada 100 pada satu-satu masa.
Agar kami dapat bekerja lebih banyak lagi dengan kepala kami dan bukan dengan tangan kami, Airflow telah menyediakan untuk kami ini:
REST API - dia masih mempunyai status Eksperimen, yang tidak menghalangnya daripada bekerja. Dengan itu, anda bukan sahaja boleh mendapatkan maklumat tentang dag dan tugas, tetapi juga berhenti/mulakan hari, buat DAG Run atau pool.
CLI - banyak alatan tersedia melalui baris arahan yang bukan sahaja menyusahkan untuk digunakan melalui WebUI, tetapi secara amnya tiada. Sebagai contoh:
backfill diperlukan untuk memulakan semula contoh tugas.
Sebagai contoh, penganalisis datang dan berkata: "Dan anda, kawan, mempunyai data yang tidak masuk akal dari 1 hingga 13 Januari! Betulkan, betulkan, betulkan, betulkan!" Dan anda adalah seorang hob:
Perkhidmatan asas: initdb, resetdb, upgradedb, checkdb.
run, yang membolehkan anda menjalankan satu tugasan contoh, dan juga menjaringkan markah pada semua kebergantungan. Selain itu, anda boleh menjalankannya melalui LocalExecutor, walaupun anda mempunyai gugusan Saderi.
Melakukan perkara yang hampir sama test, hanya juga dalam pangkalan tidak menulis apa-apa.
connections membenarkan penciptaan besar-besaran sambungan daripada cangkerang.
API Python - cara berinteraksi yang agak tegar, yang bertujuan untuk pemalam, dan tidak mengerumuninya dengan tangan kecil. Tetapi siapa yang menghalang kita daripada pergi ke /home/airflow/dags, lari ipython dan mula mengarut? Anda boleh, sebagai contoh, mengeksport semua sambungan dengan kod berikut:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Menyambung ke pangkalan data aliran Udara. Saya tidak mengesyorkan menulis kepadanya, tetapi mendapatkan keadaan tugas untuk pelbagai metrik tertentu boleh menjadi lebih pantas dan lebih mudah daripada melalui mana-mana API.
Katakan bahawa tidak semua tugas kita adalah idempoten, tetapi kadangkala ia boleh jatuh, dan ini adalah perkara biasa. Tetapi beberapa sekatan sudah mencurigakan, dan perlu diperiksa.
Awas SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
rujukan
Dan sudah tentu, sepuluh pautan pertama daripada terbitan Google ialah kandungan folder Aliran Udara daripada penanda halaman saya.
Dokumentasi Aliran Udara Apache - sudah tentu, kita mesti bermula dengan pejabat. dokumentasi, tetapi siapa yang membaca arahan?
Amalan Terbaik - Baiklah, sekurang-kurangnya baca cadangan daripada pencipta.
UI Aliran Udara - permulaan: antara muka pengguna dalam gambar
Zen Python dan Apache Airflow - pemajuan DAG tersirat, melontar konteks dalam fungsi, sekali lagi tentang kebergantungan, dan juga tentang melangkau pelancaran tugas.