Сайн байна уу, намайг Дмитрий Логвиненко гэдэг - Везет группын аналитикийн хэлтсийн өгөгдлийн инженер.
Би ETL процессыг хөгжүүлэх гайхалтай хэрэгсэл болох Apache Airflow-ийн талаар танд хэлэх болно. Гэхдээ Airflow нь маш олон талт бөгөөд олон талт тул та өгөгдлийн урсгалд оролцдоггүй ч гэсэн аливаа процессыг үе үе эхлүүлж, тэдгээрийн гүйцэтгэлийг хянах шаардлагатай байсан ч үүнийг анхааралтай авч үзэх хэрэгтэй.
Тийм ээ, би хэлэхээс гадна харуулах болно: програм нь маш олон код, дэлгэцийн агшин, зөвлөмжтэй.
Google-ээс Airflow / Wikimedia Commons гэсэн үгийг ихэвчлэн хардаг
Агуулга
Танилцуулга
Apache Airflow яг л Django шиг:
- питоноор бичсэн
- гайхалтай админ самбар байна,
- хязгааргүй өргөтгөх боломжтой
- зөвхөн илүү сайн, энэ нь огт өөр зорилгоор хийгдсэн, тухайлбал (катагийн өмнө бичсэнчлэн):
- Хязгааргүй тооны машинууд дээр даалгавруудыг гүйцэтгэх, хянах (олон Селөдерей / Кубернете болон таны ухамсрын зөвшөөрлөөр)
- Python кодыг бичих, ойлгоход маш хялбараас эхлээд динамик ажлын урсгал үүсгэх
- бэлэн бүрэлдэхүүн хэсгүүд болон гэрийн нэмэлт өргөтгөлүүдийг ашиглан аливаа мэдээллийн сан болон API-г хооронд нь холбох чадвар (энэ нь маш энгийн).
Бид Apache Airflow-ийг дараах байдлаар ашигладаг.
- Бид DWH болон ODS-д янз бүрийн эх сурвалжаас (олон SQL Server болон PostgreSQL жишээнүүд, програмын хэмжигдэхүүн бүхий төрөл бүрийн API, бүр 1С) өгөгдөл цуглуулдаг (бидэнд Vertica болон Clickhouse байдаг).
- хэр дэвшилтэт
cron
, энэ нь ODS дээр өгөгдөл нэгтгэх процессыг эхлүүлж, мөн тэдгээрийн засвар үйлчилгээг хянадаг.
Саяхныг хүртэл бидний хэрэгцээг 32 цөм, 50 ГБ RAM бүхий нэг жижиг сервер хангадаг байсан. Airflow-д энэ нь ажилладаг:
- дэлгэрэнгүй 200 төгрөг (үнэндээ бидний даалгавруудыг бөглөсөн ажлын урсгалууд),
- тус бүрд дунджаар 70 даалгавар,
- энэ сайн сайхан эхэлдэг (мөн дунджаар) цагт нэг удаа.
Бид хэрхэн өргөжсөн талаар би доор бичих болно, гэхдээ одоо бид шийдэх über-асуудлаа тодорхойлъё.
Гурван анхны SQL сервер байдаг бөгөөд тус бүр нь 50 мэдээллийн сантай - нэг төслийн жишээнүүд нь ижил бүтэцтэй (бараг хаа сайгүй, муа-ха-ха) бөгөөд энэ нь тус бүр нь Захиалгын хүснэгттэй (аз болоход ийм хүснэгттэй) гэсэн үг юм. нэрийг ямар ч бизнест оруулж болно). Бид үйлчилгээний талбаруудыг (эх сервер, эх мэдээллийн сан, ETL даалгаврын ID) нэмэх замаар өгөгдлийг авч, Vertica руу оруулдаг.
Явъя!
Үндсэн хэсэг, практик (мөн бага зэрэг онолын)
Яагаад бид (мөн та)
Мод том байхад би энгийн байсан SQL
-schik нэг Оросын жижиглэнгийн худалдаанд бид ETL процессыг өгөгдлийн урсгал гэх хоёр хэрэгслийг ашиглан хууран мэхэлсэн.
- Informatica эрчим хүчний төв - маш их тархсан, маш бүтээмжтэй, өөрийн гэсэн техник хангамжтай, өөрийн хувилбартай систем. Би түүний чадавхийн 1% -ийг бурхан хориглохыг ашигласан. Яагаад? Юуны өмнө, 380-аад оны үеийн энэ интерфейс бидэнд оюун санааны хувьд дарамт учруулсан. Хоёрдугаарт, энэхүү төхөөрөмж нь маш гоёмсог процессууд, бүрэлдэхүүн хэсгүүдийн уур хилэнг дахин ашиглах болон бусад маш чухал аж ахуйн нэгжийн заль мэхэнд зориулагдсан болно. Энэ нь Airbus AXNUMX онгоцны далавч шиг үнэтэй байдаг талаар бид юу ч хэлэхгүй.
Скриншот нь 30-аас доош насны хүмүүст бага зэрэг хор хөнөөл учруулж болзошгүйг анхаараарай
- SQL Server Integration Server - Бид энэ нөхрийг төслийн дотоод урсгалдаа ашигласан. Үнэн хэрэгтээ: бид SQL Server-ийг аль хэдийн ашиглаж байгаа бөгөөд түүний ETL хэрэгслийг ашиглахгүй байх нь ямар нэгэн үндэслэлгүй байх болно. Энд байгаа бүх зүйл сайн байна: интерфэйс хоёулаа үзэсгэлэнтэй, явцын тайлангууд ... Гэхдээ энэ нь бид програм хангамжийн бүтээгдэхүүнд дуртай учраас биш юм. Хувилбар
dtsx
(энэ нь хадгалагдахдаа холилдсон зангилаатай XML юм) бид чадна, гэхдээ ямар учиртай вэ? Хэдэн зуун хүснэгтийг нэг серверээс нөгөө сервер рүү чирэх ажлын багц хийвэл ямар вэ? Тийм ээ, хэдэн зуун вэ, хулганы товчийг дарахад таны долоовор хуруу хорин ширхэгээс унах болно. Гэхдээ энэ нь мэдээж илүү загварлаг харагдаж байна:
Бид гарцыг хайж байсан нь гарцаагүй. Бүр тохиолдол бараг л өөрөө бичсэн SSIS багц үүсгэгч дээр ирсэн ...
... дараа нь надад шинэ ажил олдлоо. Тэгээд Apache Airflow намайг гүйцэж түрүүлэв.
ETL процессын тайлбар нь энгийн Python код гэдгийг мэдээд би зүгээр л баярласандаа бүжиглээгүй. Ингэж өгөгдлийн урсгалуудыг хувилбарлаж, ялгаж, олон зуун мэдээллийн сангаас нэг бүтэцтэй хүснэгтүүдийг нэг зорилт болгон цутгах нь нэг ба хагас эсвэл хоёр 13 ”дэлгэцэнд Python кодын асуудал болсон.
Кластерийг угсарч байна
Цэцэрлэгийг бүрэн зохион байгуулахгүй байцгаая, Airflow, таны сонгосон мэдээллийн сан, Селөдерей болон усан онгоцны зогсоол дээр тайлбарласан бусад тохиолдлуудыг суулгах гэх мэт тодорхой зүйлийг энд ярихгүй байцгаая.
Бид нэн даруй туршилтаа эхлүүлэхийн тулд би зурсан docker-compose.yml
Үүнд:
- Үнэндээ өсгөцгөөе Агаарын урсгал: Хуваарьлагч, вэб сервер. Цэцэг мөн тэнд эргэлдэж, Селөдерей даалгаврыг хянах болно (учир нь үүнийг аль хэдийн оруулчихсан
apache/airflow:1.10.10-python3.7
, гэхдээ бид дургүйцэхгүй) - PostgreSQL, үүнд Airflow өөрийн үйлчилгээний мэдээллээ (хуваарьлагчийн өгөгдөл, гүйцэтгэлийн статистик гэх мэт) бичих ба Celery дууссан ажлуудыг тэмдэглэх болно;
- Redis, энэ нь Celery-ийн даалгавар зуучлагчийн үүрэг гүйцэтгэнэ;
- Селөдерей ажилтан, даалгаврыг шууд гүйцэтгэх ажилд оролцоно.
- Хавтас руу
./dags
бид dags-ийн тайлбар бүхий файлуудаа нэмнэ. Тэднийг нисэх үед авах тул найтаах бүрийн дараа бүх стекийг жонглёрлох шаардлагагүй болно.
Зарим газарт жишээн дэх кодыг бүрэн харуулаагүй (текстийг эмх замбараагүй болгохгүйн тулд), гэхдээ хаа нэгтээ процессын явцад өөрчилсөн байдаг. Ажлын кодын бүрэн жишээг агуулахаас олж болно
https://github.com/dm-logv/airflow-tutorial .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- broker
Тэмдэглэл:
- Зохиолыг угсарч байхдаа би сайн мэддэг дүр төрхөд тулгуурласан
puckel/docker-airflow - заавал шалгаарай. Магадгүй таны амьдралд өөр юу ч хэрэггүй байх. - Агаарын урсгалын бүх тохиргоог зөвхөн дамжуулан авах боломжтой
airflow.cfg
, гэхдээ бас орчны хувьсагчаар дамжуулан (хөгжүүлэгчдийн ачаар) үүнийг би хорлонтойгоор ашигласан. - Мэдээжийн хэрэг, энэ нь үйлдвэрлэхэд бэлэн биш байна: би чингэлэг дээр зүрхний цохилтыг санаатайгаар тавиагүй, аюулгүй байдлын талаар санаа зовоогүй. Гэхдээ би манай туршилтанд тохирох хамгийн бага зүйлийг хийсэн.
- Тэрийг тэмдэглэ:
- Dag хавтас нь төлөвлөгч болон ажилчдын аль алинд нь хандах боломжтой байх ёстой.
- Гуравдагч талын бүх номын санд мөн адил хамаарна - тэдгээрийг бүгдийг нь хуваарьлагч, ажилчидтай машин дээр суулгасан байх ёстой.
За, одоо бүх зүйл энгийн:
$ docker-compose up --scale worker=3
Бүх зүйл нэмэгдсэний дараа та вэб интерфэйсүүдийг харж болно:
- Агаарын урсгал:
http://127.0.0.1:8080/admin/ - Цэцэг:
http://127.0.0.1:5555/dashboard
Үндсэн ухагдахуунууд
Хэрэв та эдгээр бүх "даг"-аас юу ч ойлгоогүй бол энд товч толь бичиг байна.
- Жагсаалт - Airflow-ын хамгийн чухал авга ах, хүн биш, роботууд шаргуу ажилладагийг хянадаг: цагийн хуваарийг хянаж, даалгавруудыг шинэчилж, даалгавруудыг эхлүүлдэг.
Ерөнхийдөө хуучин хувилбаруудад тэрээр санах ойтой холбоотой асуудалтай байсан (үгүй, амнези биш, харин гоожсон) бөгөөд хуучин параметр нь тохиргоонд хэвээр үлджээ.
run_duration
- түүний дахин эхлүүлэх интервал. Харин одоо бүх зүйл сайхан байна. - DAG ("Dag" гэх мэт) - "чиглүүлсэн мөчлөгийн график", гэхдээ ийм тодорхойлолт нь цөөхөн хүнд хэлэх болно, гэхдээ үнэн хэрэгтээ энэ нь бие биетэйгээ харьцах даалгаврын сав (доороос үзнэ үү) эсвэл SSIS дахь багц ба Informatica дахь ажлын урсгалын аналог юм. .
Дагнаас гадна дэд дагзууд байж болох ч бид тэдэнд хүрч чадахгүй байх магадлалтай.
- DAG Run - өөрийн гэсэн томилогдсон анхны даг
execution_date
. Ижил дагны дагрангууд зэрэгцэн ажиллах боломжтой (хэрэв та даалгавраа сулруулсан бол мэдээж хэрэг). - Оператор нь тодорхой үйлдлийг гүйцэтгэх үүрэгтэй кодын хэсгүүд юм. Гурван төрлийн оператор байдаг:
- үйл ажиллагаабидний дуртай шиг
PythonOperator
, ямар ч (хүчинтэй) Python кодыг ажиллуулж болно; - шилжүүлэх, мэдээллийг нэг газраас нөгөө рүү зөөвөрлөж байна
MsSqlToHiveTransfer
; - мэдрэгч нөгөө талаас, энэ нь танд ямар нэгэн үйл явдал гарах хүртэл хариу үйлдэл үзүүлэх эсвэл цаашдын гүйцэтгэлийг удаашруулах боломжийг олгоно.
HttpSensor
заасан төгсгөлийн цэгийг татах боломжтой бөгөөд хүссэн хариу хүлээж байх үед шилжүүлгийг эхлүүлнэGoogleCloudStorageToS3Operator
. Сонирхолтой оюун ухаан: "Яагаад? Эцсийн эцэст та оператор дээр давталт хийж болно!" Дараа нь түр зогсоосон операторуудтай даалгаврын санг бөглөхгүйн тулд. Мэдрэгч нь дараагийн оролдлогын өмнө ажиллаж, шалгаж, үхдэг.
- үйл ажиллагаабидний дуртай шиг
- Даалгавар - зарлагдсан операторууд, төрөл харгалзахгүй, дагшинд хавсаргасан албан тушаалын зэрэглэлд дэвшинэ.
- даалгаврын жишээ - ерөнхий төлөвлөгч үүрэг гүйцэтгэгч-ажилчид руу даалгавраа илгээх цаг болсон гэж шийдсэн үед (хэрэв бид ашиглаж байгаа бол газар дээр нь)
LocalExecutor
эсвэл тохиолдолд алсын зангилаа рууCeleryExecutor
), энэ нь тэдгээрт контекстийг оноож (өөрөөр хэлбэл, хувьсагчдын багц - гүйцэтгэлийн параметрүүд), тушаал эсвэл асуулгын загваруудыг өргөжүүлж, тэдгээрийг нэгтгэдэг.
Бид даалгавар үүсгэдэг
Эхлээд бид догдлогынхоо ерөнхий схемийг тоймлон авч үзье, дараа нь бид зарим нэг энгийн бус шийдлүүдийг ашигладаг тул нарийвчилсан мэдээллийг нарийвчлан судлах болно.
Тиймээс, хамгийн энгийн хэлбэрээр ийм ганга дараах байдлаар харагдах болно.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Үүнийг юу болохыг мэдье.
- Нэгдүгээрт, бид шаардлагатай libs болон импортолдог Өөр ямар нэг зүйл;
sql_server_ds
Байна ууList[namedtuple[str, str]]
Агаарын урсгалын холболтын холболтын нэрс болон бидний хавтанг авах мэдээллийн сангуудын хамт;dag
- заавал байх ёстой манай дааганы зарлалglobals()
, эс бөгөөс Airflow үүнийг олохгүй. Даг мөн хэлэх хэрэгтэй:- түүний нэр хэн бэ
orders
- энэ нэр дараа нь вэб интерфэйс дээр гарч ирнэ, - XNUMX-р сарын XNUMX-ны шөнө дундаас эхлэн ажиллана гэж
- мөн энэ нь ойролцоогоор 6 цаг тутамд ажиллах ёстой (энд хатуу ширүүн залуусын оронд
timedelta()
зөвшөөрөгдөхcron
- шугам0 0 0/6 ? * * *
, бага дажгүй нь - гэх мэт илэрхийлэл@daily
);
- түүний нэр хэн бэ
workflow()
үндсэн ажлаа хийх болно, гэхдээ одоо биш. Одоохондоо бид контекстээ л лог руу оруулах болно.- Одоо даалгавар үүсгэх энгийн ид шид:
- бид эх сурвалжаа дамжуулдаг;
- эхлүүлэх
PythonOperator
, энэ нь бидний даммиг гүйцэтгэх болноworkflow()
. Даалгаврын өвөрмөц (даг дотор) нэрийг зааж өгөхөө бүү мартаарай. тугprovide_context
эргээд функцэд нэмэлт аргументуудыг оруулах бөгөөд бид үүнийг ашиглан анхааралтай цуглуулах болно**context
.
Одоохондоо энэ л байна. Бидний авсан зүйл:
- вэб интерфэйс дэх шинэ догол,
- Зэрэгцээ хийгдэх нэг хагас зуун даалгавар (хэрэв Агаарын урсгал, Селөдерей тохиргоо болон серверийн хүчин чадал зөвшөөрвөл).
За бараг ойлголоо.
Хэн хамаарлыг суулгах вэ?
Энэ бүх зүйлийг хялбарчлахын тулд би шургалсан docker-compose.yml
боловсруулах requirements.txt
бүх зангилаа дээр.
Одоо алга болсон:
Саарал дөрвөлжин нь хуваарь гаргагчийн боловсруулсан даалгавар юм.
Бид бага зэрэг хүлээж байна, ажилчид даалгавруудыг хураан авч байна:
Ногоонууд маань мэдээж ажлаа амжилттай дуусгасан. Улаанууд тийм ч амжилттай байдаггүй.
Дашрамд хэлэхэд манай бүтээгдэхүүн дээр хавтас байхгүй
./dags
, машинуудын хооронд ямар ч синхрончлол байхгүй - бүх дагууд оршдогgit
Манай Gitlab дээр байдаг бөгөөд Gitlab CI нь нэгдэх үед машинуудад шинэчлэлтүүдийг түгээдэгmaster
.
Цэцгийн тухай бага зэрэг
Ажилчид бидний соосгийг цохиж байх хооронд бидэнд ямар нэгэн зүйл харуулах өөр нэг хэрэгсэл болох Цэцэгийг санацгаая.
Ажилчдын зангилааны талаархи хураангуй мэдээлэл бүхий эхний хуудас:
Ажилд орсон ажлуудтай хамгийн эрчимтэй хуудас:
Манай брокерын статустай хамгийн уйтгартай хуудас:
Хамгийн тод хуудас нь даалгаврын статусын графикууд болон тэдгээрийн гүйцэтгэлийн хугацаа юм.
Бид дутуу ачааллыг ачаалдаг
Тиймээс бүх даалгавар биелсэн тул та шархадсан хүмүүсийг авч явах боломжтой.
Нэг шалтгааны улмаас шархадсан олон хүн байсан. Агаарын урсгалыг зөв ашиглах тохиолдолд эдгээр квадратууд нь өгөгдөл ирээгүй гэдгийг харуулж байна.
Та бүртгэлийг үзэж, унасан даалгаврын тохиолдлуудыг дахин эхлүүлэх хэрэгтэй.
Аль ч дөрвөлжин дээр дарснаар бид хийх боломжтой үйлдлүүдийг харах болно:
Та унасаныг аваад Clear хийж болно. Өөрөөр хэлбэл, бид тэнд ямар нэг зүйл бүтэлгүйтсэн гэдгийг мартаж, ижил төстэй даалгавар нь төлөвлөгч рүү очих болно.
Үүнийг бүх улаан дөрвөлжинтэй хулганаар хийх нь тийм ч хүмүүнлэг биш гэдэг нь тодорхой байна - энэ нь бидний Airflow-аас хүлээж байгаа зүйл биш юм. Мэдээжийн хэрэг, бид үй олноор хөнөөх зэвсэгтэй: Browse/Task Instances
Бүгдийг нэг дор сонгоод тэг болгоод зөв зүйл дээр дарна уу:
Цэвэрлэгээ хийсний дараа манай таксинууд иймэрхүү харагдаж байна (тэд хуваарь гаргагчийг хуваарь гаргахыг хүлээж байна):
Холболт, дэгээ болон бусад хувьсагч
Дараагийн DAG-ыг харах цаг болжээ. update_reports.py
:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Хүн бүр тайлангийн шинэчлэл хийж байсан уу? Энэ бол түүний дахин юм: өгөгдлийг хаанаас авах эх сурвалжуудын жагсаалт байдаг; хаана байрлуулах жагсаалт байдаг; Бүх зүйл тохиолдсон эсвэл эвдэрсэн үед дохио өгөхөө бүү мартаарай (за, энэ бидний тухай биш, үгүй).
Файлыг дахин үзэж, шинэ ойлгомжгүй зүйлсийг харцгаая:
from commons.operators import TelegramBotSendMessage
- Unblocked руу мессеж илгээх жижиг савлагаа хийснээр бид өөрсдийн операторуудыг бий болгоход юу ч саад болохгүй. (Бид энэ операторын талаар доор дэлгэрэнгүй ярих болно);default_args={}
- dag нь бүх операторууддаа ижил аргументуудыг тарааж чаддаг;to='{{ var.value.all_the_kings_men }}'
- талбайto
Бид хатуу кодчилолгүй, харин Жинжа ашиглан динамикаар үүсгэгдсэн, имэйлийн жагсаалт бүхий хувьсагчтай байх болно.Admin/Variables
;trigger_rule=TriggerRule.ALL_SUCCESS
— операторыг эхлүүлэх нөхцөл. Манай тохиолдолд бүх хамаарал шийдэгдсэн тохиолдолд л захидал дарга нарт очих болно амжилттай;tg_bot_conn_id='tg_main'
- аргументуудconn_id
бидний үүсгэсэн холболтын ID-г хүлээн авна ууAdmin/Connections
;trigger_rule=TriggerRule.ONE_FAILED
- Telegram дахь мессежүүд унасан даалгавар байгаа тохиолдолд л алга болно;task_concurrency=1
- бид нэг даалгаврын хэд хэдэн даалгаврыг нэгэн зэрэг эхлүүлэхийг хориглодог. Үгүй бол бид хэд хэдэн төхөөрөмжийг нэгэн зэрэг эхлүүлэх болноVerticaOperator
(нэг ширээ рүү харах);report_update >> [email, tg]
- бүгдVerticaOperator
дараах байдлаар захидал, мессеж илгээхдээ нэгдэх:
Гэхдээ мэдэгдэгч операторууд ажиллуулах нөхцөл өөр өөр байдаг тул зөвхөн нэг нь л ажиллах болно. Модны харагдац дээр бүх зүйл арай бага харагдахуйц харагдаж байна:
талаар хэдэн үг хэлье макро болон тэдний найзууд - хувьсагчууд.
Макро нь янз бүрийн хэрэгтэй мэдээллийг операторын аргумент болгон орлуулж чадах Жинжа орлуулагч юм. Жишээлбэл, иймэрхүү:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }}
контекст хувьсагчийн агуулга руу тэлэх болно execution_date
форматаар YYYY-MM-DD
: 2020-07-14
. Хамгийн сайн тал нь контекст хувьсагчдыг тодорхой даалгаврын жишээнд (Мод харах дахь дөрвөлжин) хадаж байгаа бөгөөд дахин эхлүүлэх үед орлуулагчид ижил утгатай болж өргөжих болно.
Даалгаврын жишээ бүр дээрх Rendered товчийг ашиглан оноож буй утгуудыг харж болно. Захидал илгээх даалгавар нь дараах байдалтай байна.
Мессеж илгээх даалгавар дээр:
Хамгийн сүүлийн хувилбарт зориулсан макроны бүрэн жагсаалтыг эндээс авах боломжтой:
Түүнээс гадна, залгаасуудын тусламжтайгаар бид өөрсдийн макро зарлах боломжтой, гэхдээ энэ бол өөр түүх юм.
Урьдчилан тодорхойлсон зүйлсээс гадна бид хувьсагчдынхаа утгыг орлуулж болно (би үүнийг дээрх кодонд аль хэдийн ашигласан). -д бүтээцгээе Admin/Variables
хэд хэдэн зүйл:
Таны ашиглаж болох бүх зүйл:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')
Утга нь скаляр эсвэл JSON байж болно. JSON тохиолдолд:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}
Хүссэн түлхүүр рүү хүрэх замыг ашиглана уу: {{ var.json.bot_config.bot.token }}
.
Би шууд утгаараа нэг үг хэлж, нэг дэлгэцийн агшинг үзүүлэх болно холболтууд. Энд бүх зүйл энгийн байдаг: хуудсан дээр Admin/Connections
Бид холболт үүсгэж, нэвтрэх / нууц үг болон илүү тодорхой параметрүүдийг нэмж оруулдаг. Үүн шиг:
Нууц үгийг шифрлэх боломжтой (өгөгдмөлөөс илүү нарийн) эсвэл та холболтын төрлийг орхиж болно (миний хийсэн шиг) tg_main
) - Үнэн хэрэгтээ эдгээр төрлүүдийн жагсаалтыг Airflow загварт холбосон бөгөөд эх код руу орохгүйгээр өргөтгөх боломжгүй (хэрэв би гэнэт Google-д ямар нэгэн зүйл оруулаагүй бол намайг засна уу), гэхдээ биднийг кредит авахад юу ч саад болохгүй. нэр.
Та мөн ижил нэртэй хэд хэдэн холболт хийж болно: энэ тохиолдолд арга BaseHook.get_connection()
, аль нь бидэнд нэрээр холболтуудыг авдаг, өгөх болно Санамсаргүй хэд хэдэн нэрсээс (Round Robin хийх нь илүү логик байх болно, гэхдээ үүнийг Airflow хөгжүүлэгчдийн ухамсарт үлдээе).
Хувьсагч ба Холболтууд нь мэдээж гайхалтай хэрэглүүр боловч тэнцвэрээ алдахгүй байх нь чухал: та урсгалынхаа аль хэсгийг кодонд хадгалдаг, мөн Airflow-д аль хэсгийг нь хадгалахад зориулж өгдөг. Нэг талаас, утгыг, жишээлбэл, шуудангийн хайрцгийг UI ашиглан хурдан өөрчлөхөд тохиромжтой байж болно. Нөгөөтэйгүүр, энэ нь бид (би) салахыг хүссэн хулганы товшилт руу буцах явдал хэвээр байна.
Холболттой ажиллах нь ажлын нэг юм дэгээ. Ерөнхийдөө Airflow дэгээ нь үүнийг гуравдагч талын үйлчилгээ, номын сантай холбох цэг юм. Жишээ нь, JiraHook
Jira-тай харилцах үйлчлүүлэгч нээх болно (та даалгавруудыг нааш цааш зөөх боломжтой), мөн SambaHook
та дотоод файлыг түлхэж болно smb
- цэг.
Захиалгат операторыг задлан шинжилж байна
Тэгээд бид үүнийг хэрхэн хийснийг харах дөхсөн TelegramBotSendMessage
Хууль commons/operators.py
бодит оператортой:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Энд, Airflow-ийн бусад бүх зүйлийн нэгэн адил бүх зүйл маш энгийн:
- -аас өвлөн авсан
BaseOperator
, энэ нь Агаарын урсгалын онцлогтой хэд хэдэн зүйлийг хэрэгжүүлдэг (чөлөөт цагаа хараарай) - Зарлагдсан талбарууд
template_fields
, үүнд Жинжа боловсруулах макро хайх болно. - Зөв аргументуудыг зохион байгуулсан
__init__()
, шаардлагатай бол анхдагч тохиргоог тохируулна уу. - Өвөг дээдсийн эхлэлийг ч бид мартаагүй.
- Холбогдох дэгээг нээв
TelegramBotHook
түүнээс үйлчлүүлэгчийн объектыг хүлээн авсан. - Дарагдсан (дахин тодорхойлсон) арга
BaseOperator.execute()
, операторыг ажиллуулах цаг ирэхэд Airfow нь тайрах болно - үүнд бид нэвтрэхээ мартаж үндсэн үйлдлийг хэрэгжүүлэх болно. (Дашрамд хэлэхэд бид шууд нэвтэрдэгstdout
иstderr
- Агаарын урсгал нь бүх зүйлийг таслан зогсоож, сайхан боож, шаардлагатай бол задлах болно.)
Бидэнд юу байгааг харцгаая commons/hooks.py
. Файлын эхний хэсэг, дэгээ өөрөө:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Би энд юу тайлбарлахаа ч мэдэхгүй байна, би зөвхөн чухал зүйлийг тэмдэглэх болно:
- Бид өвлөн авч, аргументуудын талаар бодож үзээрэй - ихэнх тохиолдолд энэ нь нэг байх болно:
conn_id
; - Стандарт аргуудыг давах: Би өөрийгөө хязгаарласан
get_conn()
, үүнд би холболтын параметрүүдийг нэрээр нь аваад зүгээр л хэсгийг авнаextra
(энэ бол JSON талбар), би (өөрийн зааврын дагуу!) Telegram бот жетоныг байрлуулсан:{"bot_token": "YOuRAwEsomeBOtToKen"}
. - Би бидний жишээг бий болгодог
TelegramBot
, тодорхой тэмдэг өгөх.
Тэгээд л болоо. Та дэгээ ашиглан үйлчлүүлэгч авч болно TelegramBotHook().clent
буюу TelegramBotHook().get_conn()
.
Файлын хоёр дахь хэсэг нь би Telegram REST API-д зориулж бичил боодол хийдэг бөгөөд үүнийг чирэхгүй байхын тулд python-telegram-bot
sendMessage
.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))
Зөв арга бол бүгдийг нь нэмэх явдал юм:
TelegramBotSendMessage
,TelegramBotHook
,TelegramBot
- залгаас дээр нийтийн репозитор руу оруулаад Нээлттэй эх рүү өгнө үү.
Бид энэ бүхнийг судалж байх хооронд манай тайлангийн шинэчлэл амжилтгүй болж, сувагт алдааны мессеж илгээсэн. Буруу эсэхийг шалгах гэж байна...
Манай нохойд ямар нэг зүйл эвдэрсэн! Энэ нь бидний хүлээж байсан зүйл биш гэж үү? Яг!
Та цутгах гэж байна уу?
Намайг ямар нэг зүйл алдсан гэж бодож байна уу? SQL Server-ээс Vertica руу өгөгдөл дамжуулна гэж амлаад аваад сэдвээсээ холдсон бололтой, новш!
Энэ харгислал санаатай байсан тул би танд зориулж зарим нэр томъёог тайлах хэрэгтэй болсон. Одоо та цааш явж болно.
Бидний төлөвлөгөө ийм байсан:
- Хий
- Даалгавруудыг бий болгох
- Бүх зүйл ямар үзэсгэлэнтэй болохыг хараарай
- Бөглөхөд сессийн дугаар оноо
- SQL серверээс өгөгдөл авах
- Vertica руу өгөгдөл оруулах
- Статистик мэдээлэл цуглуулах
Тиймээс, энэ бүхнийг эхлүүлэхийн тулд би манайхыг бага зэрэг нэмсэн docker-compose.yml
:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.py
Тэнд бид өсгөж байна:
- Vertica хостоор
dwh
хамгийн анхдагч тохиргоотой, - SQL серверийн гурван тохиолдол,
- Бид мэдээллийн баазыг сүүлийнх нь зарим мэдээллээр дүүргэдэг (ямар ч тохиолдолд хайх хэрэггүй
mssql_init.py
!)
Бид өмнөхөөсөө арай илүү төвөгтэй командын тусламжтайгаар бүх сайн сайхныг эхлүүлдэг:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Бидний гайхамшгийг санамсаргүй болгогч үүсгэсэн зүйлийг та ашиглаж болно Data Profiling/Ad Hoc Query
:
Хамгийн гол нь үүнийг шинжээчдэд үзүүлэхгүй байх явдал юм
дэлгэрэнгүй ETL сессүүд Би тэгэхгүй, тэнд бүх зүйл өчүүхэн байдаг: бид суурь хийдэг, дотор нь тэмдэг байдаг, бид контекст менежерээр бүгдийг боож, одоо үүнийг хийж байна:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Цаг нь ирлээ бидний өгөгдлийг цуглуул манай нэг хагас зуун ширээнээс. Үүнийг маш мадаггүй зөв мөрүүдийн тусламжтайгаар хийцгээе.
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
- Дэгээний тусламжтайгаар бид Airflow-аас авдаг
pymssql
-холбох - Хүсэлтэд огноо хэлбэрээр хязгаарлалтыг орлуулъя - энэ нь загвар хөдөлгүүрээр функц руу хаягдах болно.
- Бидний хүсэлтийг хангаж байна
pandas
биднийг хэн авах вэDataFrame
- Энэ нь ирээдүйд бидэнд хэрэгтэй болно.
Би орлуулалтыг ашиглаж байна
{dt}
хүсэлтийн параметрийн оронд%s
Би муу Пиноккио учраас биш, харин учир ньpandas
дийлэхгүйpymssql
мөн сүүлчийнх нь хальтирдагparams: List
тэр үнэхээр хүсч байгаа ч гэсэнtuple
.
Мөн хөгжүүлэгч гэдгийг анхаарна ууpymssql
дахиж түүнийг дэмжихгүй байхаар шийдсэн бөгөөд одоо явах цаг болжээpyodbc
.
Airflow нь бидний функцүүдийн аргументуудыг юугаар дүүргэсэн болохыг харцгаая:
Хэрэв өгөгдөл байхгүй бол үргэлжлүүлэх нь утгагүй болно. Гэхдээ дүүргэлт амжилттай болсон гэж үзэх нь бас хачирхалтай юм. Гэхдээ энэ бол алдаа биш. А-аа-аа, яах вэ?! Мөн энд юу байна:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException
Airflow-т ямар ч алдаа байхгүй гэж хэлсэн ч бид даалгаврыг алгасах болно. Интерфейс нь ногоон эсвэл улаан дөрвөлжин биш, харин ягаан өнгөтэй болно.
Өгөгдлөө шидье олон багана:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])
Тухайлбал:
- Бидний захиалга авсан мэдээллийн сан,
- Манай үерийн сессийн ID (энэ нь өөр байх болно ажил бүрийн хувьд),
- Эх сурвалжаас авсан хэш ба захиалгын ID - ингэснээр эцсийн мэдээллийн санд (бүгдийг нэг хүснэгтэд цутгадаг) бид өвөрмөц захиалгын ID-тай болно.
Эцсийн өмнөх алхам хэвээр байна: бүх зүйлийг Vertica руу хийнэ. Хачирхалтай нь үүнийг хийх хамгийн гайхалтай, үр дүнтэй аргуудын нэг бол CSV юм!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
- Бид тусгай хүлээн авагч хийж байна
StringIO
. pandas
эелдэгээр тавих болноDataFrame
хэлбэрээрCSV
- шугам.- Дуртай Vertica-нхаа холболтыг дэгээгээр нээцгээе.
- Тэгээд одоо тусламжтайгаар
copy()
Манай өгөгдлийг Вертика руу шууд илгээнэ үү!
Бид жолоочоос хэдэн мөр бөглөсөнийг аваад сеансын менежерт бүх зүйл хэвийн байгааг хэлнэ.
session.loaded_rows = cursor.rowcount
session.successful = True
Тэгээд л болоо.
Борлуулалт дээр бид зорилтот хавтанг гараар бүтээдэг. Энд би өөртөө жижиг машин хийхийг зөвшөөрсөн:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
би хэрэглэж байна
VerticaOperator()
Би өгөгдлийн сангийн схем болон хүснэгтийг үүсгэдэг (хэрэв тэд аль хэдийн байхгүй бол мэдээжийн хэрэг). Хамгийн гол нь хамаарлыг зөв зохион байгуулах явдал юм.
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> load
Дуусгах
- За, - гэж бяцхан хулгана хэлэв, - тийм биш гэж үү
Намайг ойн хамгийн аймшигтай амьтан гэдэгт чи итгэлтэй байна уу?
Жулиа Дональдсон, The Gruffalo
Хэрэв миний хамт олон бид хоёрын дунд өрсөлдөөн бий болвол: хэн нь ETL процессыг эхнээс нь хурдан үүсгэж, эхлүүлэх вэ: тэд өөрсдийн SSIS, хулгана, би Airflow-тай ... Тэгээд бид засвар үйлчилгээний хялбар байдлыг харьцуулах болно ... Хөөх, би тэднийг бүх фронтоор ялна гэдэгтэй та санал нийлэх байх гэж бодож байна!
Хэрэв бага зэрэг нухацтай авч үзвэл Apache Airflow - програмын код хэлбэрээр үйл явцыг тайлбарласнаар миний ажлыг хийсэн. их илүү тав тухтай, тааламжтай.
Нэмэлт өргөтгөл болон өргөтгөх чадварын хувьд хязгааргүй өргөтгөх чадвар нь агаарын урсгалыг бараг ямар ч салбарт ашиглах боломжийг олгодог: өгөгдөл цуглуулах, бэлтгэх, боловсруулах бүх мөчлөгт, тэр ч байтугай пуужин хөөргөх (Ангараг руу мэдээж).
Эцсийн хэсэг, лавлагаа, мэдээлэл
Бид танд зориулж цуглуулсан тармуур
start_date
. Тийм ээ, энэ бол аль хэдийн орон нутгийн меме юм. Даугийн гол аргументаар дамжууланstart_date
бүгд дамждаг. Товчхондоо, хэрэв та заасан болstart_date
одоогийн огноо баschedule_interval
- нэг өдөр, дараа нь DAG маргаашнаас эрт эхлэхгүй.start_date = datetime(2020, 7, 7, 0, 1, 2)
Тэгээд өөр асуудал байхгүй.
Үүнтэй холбоотой өөр нэг ажиллах үеийн алдаа байна:
Task is missing the start_date parameter
, энэ нь таныг даг оператортой холбохоо мартсаныг илтгэдэг.- Бүгд нэг машин дээр. Тиймээ, суурь (Airflow өөрөө болон бидний бүрэх), вэб сервер, хуваарь, ажилчид. Тэгээд бүр ажилласан. Гэвч цаг хугацаа өнгөрөхөд үйлчилгээний даалгавруудын тоо нэмэгдэж, PostgreSQL нь индекст 20 мс биш 5 секундын дотор хариу өгч эхлэхэд бид үүнийг авч, авч явсан.
- Local Executor. Тийм ээ, бид үүн дээр суусаар байгаа бөгөөд бид аль хэдийн ангалын ирмэг дээр ирчихсэн байна. LocalExecutor нь бидний хувьд хангалттай байсан, гэхдээ одоо дор хаяж нэг ажилчинтай болох цаг нь болсон бөгөөд бид CeleryExecutor руу шилжихийн тулд шаргуу ажиллах хэрэгтэй болно. Та түүнтэй нэг машин дээр ажиллах боломжтой тул Celery-г сервер дээр ч ашиглахад юу ч саад болохгүй, энэ нь "мэдээжийн хэрэг хэзээ ч үйлдвэрлэлд орохгүй!"
- Ашиглахгүй суурилуулсан хэрэгслүүд:
- холболтуудын үйлчилгээний итгэмжлэлийг хадгалах,
- SLA хожигдсон Цаг хугацаанд нь хийгээгүй даалгаварт хариу өгөх,
- xcom мета өгөгдөл солилцоход зориулагдсан (би хэлсэн зорилгоөгөгдөл!) dag ажлуудын хооронд.
- Мэйл буруугаар ашиглах. За, би юу хэлэх вэ? Унасан даалгаврын бүх давталтын дохиог тохируулсан. Одоо миний ажлын Gmail-д Airflow-аас 90 мянга гаруй имэйл байгаа бөгөөд вэб шуудангийн хошуу нь нэг удаад 100 гаруйг авч устгахаас татгалзаж байна.
Илүү олон бэрхшээлүүд:
Apache Airflow Pitfails
Илүү олон автоматжуулалтын хэрэгсэл
Биднийг гараараа бус толгойгоор илүү их ажиллахын тулд Airflow бидэнд дараах зүйлийг бэлдсэн.
REST API - Тэр туршилтын статустай хэвээр байгаа нь түүнийг ажиллахад саад болохгүй. Үүний тусламжтайгаар та зөвхөн даг болон даалгаврын талаар мэдээлэл авахаас гадна даг зогсоох/эхлүүлэх, DAG Run эсвэл усан сан үүсгэх боломжтой.CLI - WebUI ашиглан ашиглахад тохиромжгүй олон хэрэгслийг тушаалын мөрөөр ашиглах боломжтой, гэхдээ ерөнхийдөө байхгүй. Жишээлбэл:backfill
даалгаврын тохиолдлуудыг дахин эхлүүлэх шаардлагатай.
Жишээлбэл, шинжээчид ирээд: "Нөхөр та 1-р сарын 13-ээс XNUMX хүртэлх тоо баримтад дэмий юм байна! Үүнийг зас, зас, зас, зас, зас!" Та бол ийм тогоо юм:airflow backfill -s '2020-01-01' -e '2020-01-13' orders
- Үндсэн үйлчилгээ:
initdb
,resetdb
,upgradedb
,checkdb
. run
, энэ нь танд нэг жишээний ажлыг гүйцэтгэх, тэр ч байтугай бүх хамаарал дээр оноо авах боломжийг олгодог. Үүнээс гадна та үүнийг дамжуулан ажиллуулж болноLocalExecutor
, хэрэв та Celery кластертай байсан ч гэсэн.- Бараг ижил зүйлийг хийдэг
test
, зөвхөн суурь дээр юу ч бичдэггүй. connections
бүрхүүлээс холболтыг бөөнөөр үүсгэх боломжийг олгодог.
python API - залгаасуудад зориулагдсан харилцан үйлчлэлийн нэлээн хатуу арга бөгөөд жижиг гараараа бөөгнөрөхгүй. Харин биднийг очиход хэн саад хийх юм/home/airflow/dags
, ажиллуулахipython
тэгээд заваарч эхлэх үү? Жишээлбэл, та бүх холболтыг дараах кодоор экспортлох боломжтой.from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d)
- Airflow мета мэдээллийн санд холбогдож байна. Би түүн рүү бичихийг зөвлөдөггүй, гэхдээ янз бүрийн тодорхой хэмжигдэхүүнүүдэд зориулсан даалгаврын төлөвийг авах нь API ашиглахаас хамаагүй хурдан бөгөөд хялбар байх болно.
Бидний бүх ажил дутуу биш, гэхдээ заримдаа унадаг, энэ нь хэвийн зүйл гэж хэлье. Гэхдээ хэд хэдэн бөглөрөл нь аль хэдийн сэжигтэй байгаа тул шалгах шаардлагатай болно.
SQL-ээс болгоомжил!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
лавлагаа
Мэдээжийн хэрэг, Google-ийн гаргасан эхний арван холбоос бол миний хавчуурга дээрх Airflow хавтасны агуулга юм.
Apache агаарын урсгалын баримт бичиг - Мэдээжийн хэрэг, бид оффисоос эхлэх ёстой. баримт бичиг, гэхдээ зааврыг хэн уншдаг вэ?Шилдэг туршлага - За, ядаж бүтээгчдийн зөвлөмжийг уншаарай.Агаарын урсгалын UI - хамгийн эхлэл: зураг дээрх хэрэглэгчийн интерфейсApache Airflow-ийн үндсэн ойлголтуудыг ойлгох - үндсэн ойлголтуудыг сайн тайлбарласан байгаа, хэрэв (гэнэт!) Хэрэв та надаас ямар нэг зүйлийг ойлгоогүй.Tianlong-ийн блог — Агаарын урсгалын сервер/кластерийг хэрхэн бүтээх гарын авлага - Агаарын урсгалын кластер байгуулах товч гарын авлага.Lyft дээр Apache агаарын урсгалыг ажиллуулж байна - бараг ижил сонирхолтой нийтлэл, магадгүй илүү формализм, цөөн жишээг эс тооцвол.Apache Airflow хэрхэн Селөдерей ажилчдын ажлын байрыг хуваарилах вэ - Селөдерейтэй хамтран ажиллах тухай.Apache Airflow дахь DAG бичих шилдэг туршлагууд - даалгаврын чадваргүй байдал, огнооны оронд ID-аар ачаалах, хувиргах, файлын бүтэц болон бусад сонирхолтой зүйлсийн талаар.Apache Airflow дахь хамаарлыг удирдах - даалгавруудын хамаарал ба триггер дүрмийн тухай миний зөвхөн дамжуулж хэлсэн.Агаарын урсгал: Таны DAG хуваариас хол хоцорсон үед - Төлөвлөгч дэх зарим "зохиогчийн дагуу ажил" -ыг хэрхэн даван туулах, алдагдсан өгөгдлийг ачаалах, ажлуудыг эрэмбэлэх.Apache Airflow-д зориулсан ашигтай SQL асуулга - Airflow мета өгөгдөлд хэрэгтэй SQL асуулга.Apache Airflow ашиглан ажлын урсгалыг хөгжүүлж эхлээрэй - захиалгат мэдрэгч үүсгэх талаар хэрэгтэй хэсэг байдаг.Presto болон Агаарын урсгалын тусламжтайгаар AWS дээр Fetchr Data Science Infra-г бүтээх - Data Science-д зориулсан AWS дээр дэд бүтцийг бий болгох тухай сонирхолтой богино тэмдэглэл.Агаарын урсгалын DAG-ыг дибаг хийх үед шалгах 7 нийтлэг алдаа - нийтлэг алдаа (хүн зааврыг уншаагүй хэвээр байх үед).Apache Airflow ашиглан нууц үгээ хадгалах, нэвтрэх - "Холболт"-ыг ашиглаж болох ч хүмүүс нууц үгээ хадгалахдаа хэрхэн дарж байгааг инээмсэглээрэй.Python-ийн Зен ба Апачигийн агаарын урсгал - далд DAG дамжуулалт, функцуудыг контекст оруулах, мөн хамаарлын тухай, мөн даалгаврыг эхлүүлэхийг алгасах тухай.Агаарын урсгал: Бага мэддэг зөвлөмж, заль мэх, шилдэг туршлагууд - хэрэглээний талаарdefault arguments
иparams
загварууд, түүнчлэн Хувьсагч ба Холболтууд.Агаарын урсгалын хуваарь гаргах - төлөвлөгч Airflow 2.0-д хэрхэн бэлдэж байгаа тухай түүх.Docker-compose дахь 3 Селөдерей ажилчидтай Apache Airflow - манай кластерыг байрлуулах тухай бага зэрэг хуучирсан нийтлэлdocker-compose
.Агаарын урсгалын контекстийг ашиглан загварчлах 4 даалгавар - загвар болон контекст дамжуулалтыг ашиглан динамик ажлууд.Агаарын урсгал дахь алдааны мэдэгдэл - мэйл болон Slack-ийн стандарт болон захиалгат мэдэгдлүүд.Агаарын урсгалын цех: таяггүй нарийн төвөгтэй DAG - Салбарлах даалгавар, макро болон XCom.
Мөн нийтлэлд ашигласан холбоосууд:
макро лавлагаа - загварт ашиглах боломжтой орлуулагч.Нийтлэг бэрхшээлүүд - Агаарын урсгал - Даг үүсгэх үед гаргадаг нийтлэг алдаа.puckel/docker-airflow: Docker Apache Airflow -docker-compose
туршилт, дибаг хийх гэх мэт.python-telegram-bot/python-telegram-bot: Бид танд татгалзаж болохгүй боодол хийсэн. — Telegram REST API-д зориулсан Python боодол.
Эх сурвалж: www.habr.com