Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Сайн байна уу, намайг Дмитрий Логвиненко гэдэг - Везет группын аналитикийн хэлтсийн өгөгдлийн инженер.

Би ETL процессыг хөгжүүлэх гайхалтай хэрэгсэл болох Apache Airflow-ийн талаар танд хэлэх болно. Гэхдээ Airflow нь маш олон талт бөгөөд олон талт тул та өгөгдлийн урсгалд оролцдоггүй ч гэсэн аливаа процессыг үе үе эхлүүлж, тэдгээрийн гүйцэтгэлийг хянах шаардлагатай байсан ч үүнийг анхааралтай авч үзэх хэрэгтэй.

Тийм ээ, би хэлэхээс гадна харуулах болно: програм нь маш олон код, дэлгэцийн агшин, зөвлөмжтэй.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох
Google-ээс Airflow / Wikimedia Commons гэсэн үгийг ихэвчлэн хардаг

Агуулга

Танилцуулга

Apache Airflow яг л Django шиг:

  • питоноор бичсэн
  • гайхалтай админ самбар байна,
  • хязгааргүй өргөтгөх боломжтой

- зөвхөн илүү сайн, энэ нь огт өөр зорилгоор хийгдсэн, тухайлбал (катагийн өмнө бичсэнчлэн):

  • Хязгааргүй тооны машинууд дээр даалгавруудыг гүйцэтгэх, хянах (олон Селөдерей / Кубернете болон таны ухамсрын зөвшөөрлөөр)
  • Python кодыг бичих, ойлгоход маш хялбараас эхлээд динамик ажлын урсгал үүсгэх
  • бэлэн бүрэлдэхүүн хэсгүүд болон гэрийн нэмэлт өргөтгөлүүдийг ашиглан аливаа мэдээллийн сан болон API-г хооронд нь холбох чадвар (энэ нь маш энгийн).

Бид Apache Airflow-ийг дараах байдлаар ашигладаг.

  • Бид DWH болон ODS-д янз бүрийн эх сурвалжаас (олон SQL Server болон PostgreSQL жишээнүүд, програмын хэмжигдэхүүн бүхий төрөл бүрийн API, бүр 1С) өгөгдөл цуглуулдаг (бидэнд Vertica болон Clickhouse байдаг).
  • хэр дэвшилтэт cron, энэ нь ODS дээр өгөгдөл нэгтгэх процессыг эхлүүлж, мөн тэдгээрийн засвар үйлчилгээг хянадаг.

Саяхныг хүртэл бидний хэрэгцээг 32 цөм, 50 ГБ RAM бүхий нэг жижиг сервер хангадаг байсан. Airflow-д энэ нь ажилладаг:

  • дэлгэрэнгүй 200 төгрөг (үнэндээ бидний даалгавруудыг бөглөсөн ажлын урсгалууд),
  • тус бүрд дунджаар 70 даалгавар,
  • энэ сайн сайхан эхэлдэг (мөн дунджаар) цагт нэг удаа.

Бид хэрхэн өргөжсөн талаар би доор бичих болно, гэхдээ одоо бид шийдэх über-асуудлаа тодорхойлъё.

Гурван анхны SQL сервер байдаг бөгөөд тус бүр нь 50 мэдээллийн сантай - нэг төслийн жишээнүүд нь ижил бүтэцтэй (бараг хаа сайгүй, муа-ха-ха) бөгөөд энэ нь тус бүр нь Захиалгын хүснэгттэй (аз болоход ийм хүснэгттэй) гэсэн үг юм. нэрийг ямар ч бизнест оруулж болно). Бид үйлчилгээний талбаруудыг (эх сервер, эх мэдээллийн сан, ETL даалгаврын ID) нэмэх замаар өгөгдлийг авч, Vertica руу оруулдаг.

Явъя!

Үндсэн хэсэг, практик (мөн бага зэрэг онолын)

Яагаад бид (мөн та)

Мод том байхад би энгийн байсан SQL-schik нэг Оросын жижиглэнгийн худалдаанд бид ETL процессыг өгөгдлийн урсгал гэх хоёр хэрэгслийг ашиглан хууран мэхэлсэн.

  • Informatica эрчим хүчний төв - маш их тархсан, маш бүтээмжтэй, өөрийн гэсэн техник хангамжтай, өөрийн хувилбартай систем. Би түүний чадавхийн 1% -ийг бурхан хориглохыг ашигласан. Яагаад? Юуны өмнө, 380-аад оны үеийн энэ интерфейс бидэнд оюун санааны хувьд дарамт учруулсан. Хоёрдугаарт, энэхүү төхөөрөмж нь маш гоёмсог процессууд, бүрэлдэхүүн хэсгүүдийн уур хилэнг дахин ашиглах болон бусад маш чухал аж ахуйн нэгжийн заль мэхэнд зориулагдсан болно. Энэ нь Airbus AXNUMX онгоцны далавч шиг үнэтэй байдаг талаар бид юу ч хэлэхгүй.

    Скриншот нь 30-аас доош насны хүмүүст бага зэрэг хор хөнөөл учруулж болзошгүйг анхаараарай

    Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

  • SQL Server Integration Server - Бид энэ нөхрийг төслийн дотоод урсгалдаа ашигласан. Үнэн хэрэгтээ: бид SQL Server-ийг аль хэдийн ашиглаж байгаа бөгөөд түүний ETL хэрэгслийг ашиглахгүй байх нь ямар нэгэн үндэслэлгүй байх болно. Энд байгаа бүх зүйл сайн байна: интерфэйс хоёулаа үзэсгэлэнтэй, явцын тайлангууд ... Гэхдээ энэ нь бид програм хангамжийн бүтээгдэхүүнд дуртай учраас биш юм. Хувилбар dtsx (энэ нь хадгалагдахдаа холилдсон зангилаатай XML юм) бид чадна, гэхдээ ямар учиртай вэ? Хэдэн зуун хүснэгтийг нэг серверээс нөгөө сервер рүү чирэх ажлын багц хийвэл ямар вэ? Тийм ээ, хэдэн зуун вэ, хулганы товчийг дарахад таны долоовор хуруу хорин ширхэгээс унах болно. Гэхдээ энэ нь мэдээж илүү загварлаг харагдаж байна:

    Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Бид гарцыг хайж байсан нь гарцаагүй. Бүр тохиолдол бараг л өөрөө бичсэн SSIS багц үүсгэгч дээр ирсэн ...

... дараа нь надад шинэ ажил олдлоо. Тэгээд Apache Airflow намайг гүйцэж түрүүлэв.

ETL процессын тайлбар нь энгийн Python код гэдгийг мэдээд би зүгээр л баярласандаа бүжиглээгүй. Ингэж өгөгдлийн урсгалуудыг хувилбарлаж, ялгаж, олон зуун мэдээллийн сангаас нэг бүтэцтэй хүснэгтүүдийг нэг зорилт болгон цутгах нь нэг ба хагас эсвэл хоёр 13 ”дэлгэцэнд Python кодын асуудал болсон.

Кластерийг угсарч байна

Цэцэрлэгийг бүрэн зохион байгуулахгүй байцгаая, Airflow, таны сонгосон мэдээллийн сан, Селөдерей болон усан онгоцны зогсоол дээр тайлбарласан бусад тохиолдлуудыг суулгах гэх мэт тодорхой зүйлийг энд ярихгүй байцгаая.

Бид нэн даруй туршилтаа эхлүүлэхийн тулд би зурсан docker-compose.yml Үүнд:

  • Үнэндээ өсгөцгөөе Агаарын урсгал: Хуваарьлагч, вэб сервер. Цэцэг мөн тэнд эргэлдэж, Селөдерей даалгаврыг хянах болно (учир нь үүнийг аль хэдийн оруулчихсан apache/airflow:1.10.10-python3.7, гэхдээ бид дургүйцэхгүй)
  • PostgreSQL, үүнд Airflow өөрийн үйлчилгээний мэдээллээ (хуваарьлагчийн өгөгдөл, гүйцэтгэлийн статистик гэх мэт) бичих ба Celery дууссан ажлуудыг тэмдэглэх болно;
  • Redis, энэ нь Celery-ийн даалгавар зуучлагчийн үүрэг гүйцэтгэнэ;
  • Селөдерей ажилтан, даалгаврыг шууд гүйцэтгэх ажилд оролцоно.
  • Хавтас руу ./dags бид dags-ийн тайлбар бүхий файлуудаа нэмнэ. Тэднийг нисэх үед авах тул найтаах бүрийн дараа бүх стекийг жонглёрлох шаардлагагүй болно.

Зарим газарт жишээн дэх кодыг бүрэн харуулаагүй (текстийг эмх замбараагүй болгохгүйн тулд), гэхдээ хаа нэгтээ процессын явцад өөрчилсөн байдаг. Ажлын кодын бүрэн жишээг агуулахаас олж болно https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Тэмдэглэл:

  • Зохиолыг угсарч байхдаа би сайн мэддэг дүр төрхөд тулгуурласан puckel/docker-airflow - заавал шалгаарай. Магадгүй таны амьдралд өөр юу ч хэрэггүй байх.
  • Агаарын урсгалын бүх тохиргоог зөвхөн дамжуулан авах боломжтой airflow.cfg, гэхдээ бас орчны хувьсагчаар дамжуулан (хөгжүүлэгчдийн ачаар) үүнийг би хорлонтойгоор ашигласан.
  • Мэдээжийн хэрэг, энэ нь үйлдвэрлэхэд бэлэн биш байна: би чингэлэг дээр зүрхний цохилтыг санаатайгаар тавиагүй, аюулгүй байдлын талаар санаа зовоогүй. Гэхдээ би манай туршилтанд тохирох хамгийн бага зүйлийг хийсэн.
  • Тэрийг тэмдэглэ:
    • Dag хавтас нь төлөвлөгч болон ажилчдын аль алинд нь хандах боломжтой байх ёстой.
    • Гуравдагч талын бүх номын санд мөн адил хамаарна - тэдгээрийг бүгдийг нь хуваарьлагч, ажилчидтай машин дээр суулгасан байх ёстой.

За, одоо бүх зүйл энгийн:

$ docker-compose up --scale worker=3

Бүх зүйл нэмэгдсэний дараа та вэб интерфэйсүүдийг харж болно:

Үндсэн ухагдахуунууд

Хэрэв та эдгээр бүх "даг"-аас юу ч ойлгоогүй бол энд товч толь бичиг байна.

  • Жагсаалт - Airflow-ын хамгийн чухал авга ах, хүн биш, роботууд шаргуу ажилладагийг хянадаг: цагийн хуваарийг хянаж, даалгавруудыг шинэчилж, даалгавруудыг эхлүүлдэг.

    Ерөнхийдөө хуучин хувилбаруудад тэрээр санах ойтой холбоотой асуудалтай байсан (үгүй, амнези биш, харин гоожсон) бөгөөд хуучин параметр нь тохиргоонд хэвээр үлджээ. run_duration - түүний дахин эхлүүлэх интервал. Харин одоо бүх зүйл сайхан байна.

  • DAG ("Dag" гэх мэт) - "чиглүүлсэн мөчлөгийн график", гэхдээ ийм тодорхойлолт нь цөөхөн хүнд хэлэх болно, гэхдээ үнэн хэрэгтээ энэ нь бие биетэйгээ харьцах даалгаврын сав (доороос үзнэ үү) эсвэл SSIS дахь багц ба Informatica дахь ажлын урсгалын аналог юм. .

    Дагнаас гадна дэд дагзууд байж болох ч бид тэдэнд хүрч чадахгүй байх магадлалтай.

  • DAG Run - өөрийн гэсэн томилогдсон анхны даг execution_date. Ижил дагны дагрангууд зэрэгцэн ажиллах боломжтой (хэрэв та даалгавраа сулруулсан бол мэдээж хэрэг).
  • Оператор нь тодорхой үйлдлийг гүйцэтгэх үүрэгтэй кодын хэсгүүд юм. Гурван төрлийн оператор байдаг:
    • үйл ажиллагаабидний дуртай шиг PythonOperator, ямар ч (хүчинтэй) Python кодыг ажиллуулж болно;
    • шилжүүлэх, мэдээллийг нэг газраас нөгөө рүү зөөвөрлөж байна MsSqlToHiveTransfer;
    • мэдрэгч нөгөө талаас, энэ нь танд ямар нэгэн үйл явдал гарах хүртэл хариу үйлдэл үзүүлэх эсвэл цаашдын гүйцэтгэлийг удаашруулах боломжийг олгоно. HttpSensor заасан төгсгөлийн цэгийг татах боломжтой бөгөөд хүссэн хариу хүлээж байх үед шилжүүлгийг эхлүүлнэ GoogleCloudStorageToS3Operator. Сонирхолтой оюун ухаан: "Яагаад? Эцсийн эцэст та оператор дээр давталт хийж болно!" Дараа нь түр зогсоосон операторуудтай даалгаврын санг бөглөхгүйн тулд. Мэдрэгч нь дараагийн оролдлогын өмнө ажиллаж, шалгаж, үхдэг.
  • Даалгавар - зарлагдсан операторууд, төрөл харгалзахгүй, дагшинд хавсаргасан албан тушаалын зэрэглэлд дэвшинэ.
  • даалгаврын жишээ - ерөнхий төлөвлөгч үүрэг гүйцэтгэгч-ажилчид руу даалгавраа илгээх цаг болсон гэж шийдсэн үед (хэрэв бид ашиглаж байгаа бол газар дээр нь) LocalExecutor эсвэл тохиолдолд алсын зангилаа руу CeleryExecutor), энэ нь тэдгээрт контекстийг оноож (өөрөөр хэлбэл, хувьсагчдын багц - гүйцэтгэлийн параметрүүд), тушаал эсвэл асуулгын загваруудыг өргөжүүлж, тэдгээрийг нэгтгэдэг.

Бид даалгавар үүсгэдэг

Эхлээд бид догдлогынхоо ерөнхий схемийг тоймлон авч үзье, дараа нь бид зарим нэг энгийн бус шийдлүүдийг ашигладаг тул нарийвчилсан мэдээллийг нарийвчлан судлах болно.

Тиймээс, хамгийн энгийн хэлбэрээр ийм ганга дараах байдлаар харагдах болно.

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Үүнийг юу болохыг мэдье.

  • Нэгдүгээрт, бид шаардлагатай libs болон импортолдог Өөр ямар нэг зүйл;
  • sql_server_ds Байна уу List[namedtuple[str, str]] Агаарын урсгалын холболтын холболтын нэрс болон бидний хавтанг авах мэдээллийн сангуудын хамт;
  • dag - заавал байх ёстой манай дааганы зарлал globals(), эс бөгөөс Airflow үүнийг олохгүй. Даг мөн хэлэх хэрэгтэй:
    • түүний нэр хэн бэ orders - энэ нэр дараа нь вэб интерфэйс дээр гарч ирнэ,
    • XNUMX-р сарын XNUMX-ны шөнө дундаас эхлэн ажиллана гэж
    • мөн энэ нь ойролцоогоор 6 цаг тутамд ажиллах ёстой (энд хатуу ширүүн залуусын оронд timedelta() зөвшөөрөгдөх cron- шугам 0 0 0/6 ? * * *, бага дажгүй нь - гэх мэт илэрхийлэл @daily);
  • workflow() үндсэн ажлаа хийх болно, гэхдээ одоо биш. Одоохондоо бид контекстээ л лог руу оруулах болно.
  • Одоо даалгавар үүсгэх энгийн ид шид:
    • бид эх сурвалжаа дамжуулдаг;
    • эхлүүлэх PythonOperator, энэ нь бидний даммиг гүйцэтгэх болно workflow(). Даалгаврын өвөрмөц (даг дотор) нэрийг зааж өгөхөө бүү мартаарай. туг provide_context эргээд функцэд нэмэлт аргументуудыг оруулах бөгөөд бид үүнийг ашиглан анхааралтай цуглуулах болно **context.

Одоохондоо энэ л байна. Бидний авсан зүйл:

  • вэб интерфэйс дэх шинэ догол,
  • Зэрэгцээ хийгдэх нэг хагас зуун даалгавар (хэрэв Агаарын урсгал, Селөдерей тохиргоо болон серверийн хүчин чадал зөвшөөрвөл).

За бараг ойлголоо.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох
Хэн хамаарлыг суулгах вэ?

Энэ бүх зүйлийг хялбарчлахын тулд би шургалсан docker-compose.yml боловсруулах requirements.txt бүх зангилаа дээр.

Одоо алга болсон:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Саарал дөрвөлжин нь хуваарь гаргагчийн боловсруулсан даалгавар юм.

Бид бага зэрэг хүлээж байна, ажилчид даалгавруудыг хураан авч байна:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Ногоонууд маань мэдээж ажлаа амжилттай дуусгасан. Улаанууд тийм ч амжилттай байдаггүй.

Дашрамд хэлэхэд манай бүтээгдэхүүн дээр хавтас байхгүй ./dags, машинуудын хооронд ямар ч синхрончлол байхгүй - бүх дагууд оршдог git Манай Gitlab дээр байдаг бөгөөд Gitlab CI нь нэгдэх үед машинуудад шинэчлэлтүүдийг түгээдэг master.

Цэцгийн тухай бага зэрэг

Ажилчид бидний соосгийг цохиж байх хооронд бидэнд ямар нэгэн зүйл харуулах өөр нэг хэрэгсэл болох Цэцэгийг санацгаая.

Ажилчдын зангилааны талаархи хураангуй мэдээлэл бүхий эхний хуудас:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Ажилд орсон ажлуудтай хамгийн эрчимтэй хуудас:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Манай брокерын статустай хамгийн уйтгартай хуудас:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Хамгийн тод хуудас нь даалгаврын статусын графикууд болон тэдгээрийн гүйцэтгэлийн хугацаа юм.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Бид дутуу ачааллыг ачаалдаг

Тиймээс бүх даалгавар биелсэн тул та шархадсан хүмүүсийг авч явах боломжтой.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Нэг шалтгааны улмаас шархадсан олон хүн байсан. Агаарын урсгалыг зөв ашиглах тохиолдолд эдгээр квадратууд нь өгөгдөл ирээгүй гэдгийг харуулж байна.

Та бүртгэлийг үзэж, унасан даалгаврын тохиолдлуудыг дахин эхлүүлэх хэрэгтэй.

Аль ч дөрвөлжин дээр дарснаар бид хийх боломжтой үйлдлүүдийг харах болно:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Та унасаныг аваад Clear хийж болно. Өөрөөр хэлбэл, бид тэнд ямар нэг зүйл бүтэлгүйтсэн гэдгийг мартаж, ижил төстэй даалгавар нь төлөвлөгч рүү очих болно.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Үүнийг бүх улаан дөрвөлжинтэй хулганаар хийх нь тийм ч хүмүүнлэг биш гэдэг нь тодорхой байна - энэ нь бидний Airflow-аас хүлээж байгаа зүйл биш юм. Мэдээжийн хэрэг, бид үй олноор хөнөөх зэвсэгтэй: Browse/Task Instances

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Бүгдийг нэг дор сонгоод тэг болгоод зөв зүйл дээр дарна уу:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Цэвэрлэгээ хийсний дараа манай таксинууд иймэрхүү харагдаж байна (тэд хуваарь гаргагчийг хуваарь гаргахыг хүлээж байна):

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Холболт, дэгээ болон бусад хувьсагч

Дараагийн DAG-ыг харах цаг болжээ. update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Хүн бүр тайлангийн шинэчлэл хийж байсан уу? Энэ бол түүний дахин юм: өгөгдлийг хаанаас авах эх сурвалжуудын жагсаалт байдаг; хаана байрлуулах жагсаалт байдаг; Бүх зүйл тохиолдсон эсвэл эвдэрсэн үед дохио өгөхөө бүү мартаарай (за, энэ бидний тухай биш, үгүй).

Файлыг дахин үзэж, шинэ ойлгомжгүй зүйлсийг харцгаая:

  • from commons.operators import TelegramBotSendMessage - Unblocked руу мессеж илгээх жижиг савлагаа хийснээр бид өөрсдийн операторуудыг бий болгоход юу ч саад болохгүй. (Бид энэ операторын талаар доор дэлгэрэнгүй ярих болно);
  • default_args={} - dag нь бүх операторууддаа ижил аргументуудыг тарааж чаддаг;
  • to='{{ var.value.all_the_kings_men }}' - талбай to Бид хатуу кодчилолгүй, харин Жинжа ашиглан динамикаар үүсгэгдсэн, имэйлийн жагсаалт бүхий хувьсагчтай байх болно. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — операторыг эхлүүлэх нөхцөл. Манай тохиолдолд бүх хамаарал шийдэгдсэн тохиолдолд л захидал дарга нарт очих болно амжилттай;
  • tg_bot_conn_id='tg_main' - аргументууд conn_id бидний үүсгэсэн холболтын ID-г хүлээн авна уу Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram дахь мессежүүд унасан даалгавар байгаа тохиолдолд л алга болно;
  • task_concurrency=1 - бид нэг даалгаврын хэд хэдэн даалгаврыг нэгэн зэрэг эхлүүлэхийг хориглодог. Үгүй бол бид хэд хэдэн төхөөрөмжийг нэгэн зэрэг эхлүүлэх болно VerticaOperator (нэг ширээ рүү харах);
  • report_update >> [email, tg] - бүгд VerticaOperator дараах байдлаар захидал, мессеж илгээхдээ нэгдэх:
    Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

    Гэхдээ мэдэгдэгч операторууд ажиллуулах нөхцөл өөр өөр байдаг тул зөвхөн нэг нь л ажиллах болно. Модны харагдац дээр бүх зүйл арай бага харагдахуйц харагдаж байна:
    Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

талаар хэдэн үг хэлье макро болон тэдний найзууд - хувьсагчууд.

Макро нь янз бүрийн хэрэгтэй мэдээллийг операторын аргумент болгон орлуулж чадах Жинжа орлуулагч юм. Жишээлбэл, иймэрхүү:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} контекст хувьсагчийн агуулга руу тэлэх болно execution_date форматаар YYYY-MM-DD: 2020-07-14. Хамгийн сайн тал нь контекст хувьсагчдыг тодорхой даалгаврын жишээнд (Мод харах дахь дөрвөлжин) хадаж байгаа бөгөөд дахин эхлүүлэх үед орлуулагчид ижил утгатай болж өргөжих болно.

Даалгаврын жишээ бүр дээрх Rendered товчийг ашиглан оноож буй утгуудыг харж болно. Захидал илгээх даалгавар нь дараах байдалтай байна.

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Мессеж илгээх даалгавар дээр:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Хамгийн сүүлийн хувилбарт зориулсан макроны бүрэн жагсаалтыг эндээс авах боломжтой: макро лавлагаа

Түүнээс гадна, залгаасуудын тусламжтайгаар бид өөрсдийн макро зарлах боломжтой, гэхдээ энэ бол өөр түүх юм.

Урьдчилан тодорхойлсон зүйлсээс гадна бид хувьсагчдынхаа утгыг орлуулж болно (би үүнийг дээрх кодонд аль хэдийн ашигласан). -д бүтээцгээе Admin/Variables хэд хэдэн зүйл:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Таны ашиглаж болох бүх зүйл:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Утга нь скаляр эсвэл JSON байж болно. JSON тохиолдолд:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

Хүссэн түлхүүр рүү хүрэх замыг ашиглана уу: {{ var.json.bot_config.bot.token }}.

Би шууд утгаараа нэг үг хэлж, нэг дэлгэцийн агшинг үзүүлэх болно холболтууд. Энд бүх зүйл энгийн байдаг: хуудсан дээр Admin/Connections Бид холболт үүсгэж, нэвтрэх / нууц үг болон илүү тодорхой параметрүүдийг нэмж оруулдаг. Үүн шиг:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Нууц үгийг шифрлэх боломжтой (өгөгдмөлөөс илүү нарийн) эсвэл та холболтын төрлийг орхиж болно (миний хийсэн шиг) tg_main) - Үнэн хэрэгтээ эдгээр төрлүүдийн жагсаалтыг Airflow загварт холбосон бөгөөд эх код руу орохгүйгээр өргөтгөх боломжгүй (хэрэв би гэнэт Google-д ямар нэгэн зүйл оруулаагүй бол намайг засна уу), гэхдээ биднийг кредит авахад юу ч саад болохгүй. нэр.

Та мөн ижил нэртэй хэд хэдэн холболт хийж болно: энэ тохиолдолд арга BaseHook.get_connection(), аль нь бидэнд нэрээр холболтуудыг авдаг, өгөх болно Санамсаргүй хэд хэдэн нэрсээс (Round Robin хийх нь илүү логик байх болно, гэхдээ үүнийг Airflow хөгжүүлэгчдийн ухамсарт үлдээе).

Хувьсагч ба Холболтууд нь мэдээж гайхалтай хэрэглүүр боловч тэнцвэрээ алдахгүй байх нь чухал: та урсгалынхаа аль хэсгийг кодонд хадгалдаг, мөн Airflow-д аль хэсгийг нь хадгалахад зориулж өгдөг. Нэг талаас, утгыг, жишээлбэл, шуудангийн хайрцгийг UI ашиглан хурдан өөрчлөхөд тохиромжтой байж болно. Нөгөөтэйгүүр, энэ нь бид (би) салахыг хүссэн хулганы товшилт руу буцах явдал хэвээр байна.

Холболттой ажиллах нь ажлын нэг юм дэгээ. Ерөнхийдөө Airflow дэгээ нь үүнийг гуравдагч талын үйлчилгээ, номын сантай холбох цэг юм. Жишээ нь, JiraHook Jira-тай харилцах үйлчлүүлэгч нээх болно (та даалгавруудыг нааш цааш зөөх боломжтой), мөн SambaHook та дотоод файлыг түлхэж болно smb- цэг.

Захиалгат операторыг задлан шинжилж байна

Тэгээд бид үүнийг хэрхэн хийснийг харах дөхсөн TelegramBotSendMessage

Хууль commons/operators.py бодит оператортой:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Энд, Airflow-ийн бусад бүх зүйлийн нэгэн адил бүх зүйл маш энгийн:

  • -аас өвлөн авсан BaseOperator, энэ нь Агаарын урсгалын онцлогтой хэд хэдэн зүйлийг хэрэгжүүлдэг (чөлөөт цагаа хараарай)
  • Зарлагдсан талбарууд template_fields, үүнд Жинжа боловсруулах макро хайх болно.
  • Зөв аргументуудыг зохион байгуулсан __init__(), шаардлагатай бол анхдагч тохиргоог тохируулна уу.
  • Өвөг дээдсийн эхлэлийг ч бид мартаагүй.
  • Холбогдох дэгээг нээв TelegramBotHookтүүнээс үйлчлүүлэгчийн объектыг хүлээн авсан.
  • Дарагдсан (дахин тодорхойлсон) арга BaseOperator.execute(), операторыг ажиллуулах цаг ирэхэд Airfow нь тайрах болно - үүнд бид нэвтрэхээ мартаж үндсэн үйлдлийг хэрэгжүүлэх болно. (Дашрамд хэлэхэд бид шууд нэвтэрдэг stdout и stderr - Агаарын урсгал нь бүх зүйлийг таслан зогсоож, сайхан боож, шаардлагатай бол задлах болно.)

Бидэнд юу байгааг харцгаая commons/hooks.py. Файлын эхний хэсэг, дэгээ өөрөө:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Би энд юу тайлбарлахаа ч мэдэхгүй байна, би зөвхөн чухал зүйлийг тэмдэглэх болно:

  • Бид өвлөн авч, аргументуудын талаар бодож үзээрэй - ихэнх тохиолдолд энэ нь нэг байх болно: conn_id;
  • Стандарт аргуудыг давах: Би өөрийгөө хязгаарласан get_conn(), үүнд би холболтын параметрүүдийг нэрээр нь аваад зүгээр л хэсгийг авна extra (энэ бол JSON талбар), би (өөрийн зааврын дагуу!) Telegram бот жетоныг байрлуулсан: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Би бидний жишээг бий болгодог TelegramBot, тодорхой тэмдэг өгөх.

Тэгээд л болоо. Та дэгээ ашиглан үйлчлүүлэгч авч болно TelegramBotHook().clent буюу TelegramBotHook().get_conn().

Файлын хоёр дахь хэсэг нь би Telegram REST API-д зориулж бичил боодол хийдэг бөгөөд үүнийг чирэхгүй байхын тулд python-telegram-bot нэг аргын хувьд sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Зөв арга бол бүгдийг нь нэмэх явдал юм: TelegramBotSendMessage, TelegramBotHook, TelegramBot - залгаас дээр нийтийн репозитор руу оруулаад Нээлттэй эх рүү өгнө үү.

Бид энэ бүхнийг судалж байх хооронд манай тайлангийн шинэчлэл амжилтгүй болж, сувагт алдааны мессеж илгээсэн. Буруу эсэхийг шалгах гэж байна...

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох
Манай нохойд ямар нэг зүйл эвдэрсэн! Энэ нь бидний хүлээж байсан зүйл биш гэж үү? Яг!

Та цутгах гэж байна уу?

Намайг ямар нэг зүйл алдсан гэж бодож байна уу? SQL Server-ээс Vertica руу өгөгдөл дамжуулна гэж амлаад аваад сэдвээсээ холдсон бололтой, новш!

Энэ харгислал санаатай байсан тул би танд зориулж зарим нэр томъёог тайлах хэрэгтэй болсон. Одоо та цааш явж болно.

Бидний төлөвлөгөө ийм байсан:

  1. Хий
  2. Даалгавруудыг бий болгох
  3. Бүх зүйл ямар үзэсгэлэнтэй болохыг хараарай
  4. Бөглөхөд сессийн дугаар оноо
  5. SQL серверээс өгөгдөл авах
  6. Vertica руу өгөгдөл оруулах
  7. Статистик мэдээлэл цуглуулах

Тиймээс, энэ бүхнийг эхлүүлэхийн тулд би манайхыг бага зэрэг нэмсэн docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Тэнд бид өсгөж байна:

  • Vertica хостоор dwh хамгийн анхдагч тохиргоотой,
  • SQL серверийн гурван тохиолдол,
  • Бид мэдээллийн баазыг сүүлийнх нь зарим мэдээллээр дүүргэдэг (ямар ч тохиолдолд хайх хэрэггүй mssql_init.py!)

Бид өмнөхөөсөө арай илүү төвөгтэй командын тусламжтайгаар бүх сайн сайхныг эхлүүлдэг:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Бидний гайхамшгийг санамсаргүй болгогч үүсгэсэн зүйлийг та ашиглаж болно Data Profiling/Ad Hoc Query:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох
Хамгийн гол нь үүнийг шинжээчдэд үзүүлэхгүй байх явдал юм

дэлгэрэнгүй ETL сессүүд Би тэгэхгүй, тэнд бүх зүйл өчүүхэн байдаг: бид суурь хийдэг, дотор нь тэмдэг байдаг, бид контекст менежерээр бүгдийг боож, одоо үүнийг хийж байна:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Цаг нь ирлээ бидний өгөгдлийг цуглуул манай нэг хагас зуун ширээнээс. Үүнийг маш мадаггүй зөв мөрүүдийн тусламжтайгаар хийцгээе.

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Дэгээний тусламжтайгаар бид Airflow-аас авдаг pymssql-холбох
  2. Хүсэлтэд огноо хэлбэрээр хязгаарлалтыг орлуулъя - энэ нь загвар хөдөлгүүрээр функц руу хаягдах болно.
  3. Бидний хүсэлтийг хангаж байна pandasбиднийг хэн авах вэ DataFrame - Энэ нь ирээдүйд бидэнд хэрэгтэй болно.

Би орлуулалтыг ашиглаж байна {dt} хүсэлтийн параметрийн оронд %s Би муу Пиноккио учраас биш, харин учир нь pandas дийлэхгүй pymssql мөн сүүлчийнх нь хальтирдаг params: Listтэр үнэхээр хүсч байгаа ч гэсэн tuple.
Мөн хөгжүүлэгч гэдгийг анхаарна уу pymssql дахиж түүнийг дэмжихгүй байхаар шийдсэн бөгөөд одоо явах цаг болжээ pyodbc.

Airflow нь бидний функцүүдийн аргументуудыг юугаар дүүргэсэн болохыг харцгаая:

Apache агаарын урсгал: ETL-ийг илүү хялбар болгох

Хэрэв өгөгдөл байхгүй бол үргэлжлүүлэх нь утгагүй болно. Гэхдээ дүүргэлт амжилттай болсон гэж үзэх нь бас хачирхалтай юм. Гэхдээ энэ бол алдаа биш. А-аа-аа, яах вэ?! Мөн энд юу байна:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow-т ямар ч алдаа байхгүй гэж хэлсэн ч бид даалгаврыг алгасах болно. Интерфейс нь ногоон эсвэл улаан дөрвөлжин биш, харин ягаан өнгөтэй болно.

Өгөгдлөө шидье олон багана:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Тухайлбал:

  • Бидний захиалга авсан мэдээллийн сан,
  • Манай үерийн сессийн ID (энэ нь өөр байх болно ажил бүрийн хувьд),
  • Эх сурвалжаас авсан хэш ба захиалгын ID - ингэснээр эцсийн мэдээллийн санд (бүгдийг нэг хүснэгтэд цутгадаг) бид өвөрмөц захиалгын ID-тай болно.

Эцсийн өмнөх алхам хэвээр байна: бүх зүйлийг Vertica руу хийнэ. Хачирхалтай нь үүнийг хийх хамгийн гайхалтай, үр дүнтэй аргуудын нэг бол CSV юм!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Бид тусгай хүлээн авагч хийж байна StringIO.
  2. pandas эелдэгээр тавих болно DataFrame хэлбэрээр CSV- шугам.
  3. Дуртай Vertica-нхаа холболтыг дэгээгээр нээцгээе.
  4. Тэгээд одоо тусламжтайгаар copy() Манай өгөгдлийг Вертика руу шууд илгээнэ үү!

Бид жолоочоос хэдэн мөр бөглөсөнийг аваад сеансын менежерт бүх зүйл хэвийн байгааг хэлнэ.

session.loaded_rows = cursor.rowcount
session.successful = True

Тэгээд л болоо.

Борлуулалт дээр бид зорилтот хавтанг гараар бүтээдэг. Энд би өөртөө жижиг машин хийхийг зөвшөөрсөн:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

би хэрэглэж байна VerticaOperator() Би өгөгдлийн сангийн схем болон хүснэгтийг үүсгэдэг (хэрэв тэд аль хэдийн байхгүй бол мэдээжийн хэрэг). Хамгийн гол нь хамаарлыг зөв зохион байгуулах явдал юм.

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Дуусгах

- За, - гэж бяцхан хулгана хэлэв, - тийм биш гэж үү
Намайг ойн хамгийн аймшигтай амьтан гэдэгт чи итгэлтэй байна уу?

Жулиа Дональдсон, The Gruffalo

Хэрэв миний хамт олон бид хоёрын дунд өрсөлдөөн бий болвол: хэн нь ETL процессыг эхнээс нь хурдан үүсгэж, эхлүүлэх вэ: тэд өөрсдийн SSIS, хулгана, би Airflow-тай ... Тэгээд бид засвар үйлчилгээний хялбар байдлыг харьцуулах болно ... Хөөх, би тэднийг бүх фронтоор ялна гэдэгтэй та санал нийлэх байх гэж бодож байна!

Хэрэв бага зэрэг нухацтай авч үзвэл Apache Airflow - програмын код хэлбэрээр үйл явцыг тайлбарласнаар миний ажлыг хийсэн. их илүү тав тухтай, тааламжтай.

Нэмэлт өргөтгөл болон өргөтгөх чадварын хувьд хязгааргүй өргөтгөх чадвар нь агаарын урсгалыг бараг ямар ч салбарт ашиглах боломжийг олгодог: өгөгдөл цуглуулах, бэлтгэх, боловсруулах бүх мөчлөгт, тэр ч байтугай пуужин хөөргөх (Ангараг руу мэдээж).

Эцсийн хэсэг, лавлагаа, мэдээлэл

Бид танд зориулж цуглуулсан тармуур

  • start_date. Тийм ээ, энэ бол аль хэдийн орон нутгийн меме юм. Даугийн гол аргументаар дамжуулан start_date бүгд дамждаг. Товчхондоо, хэрэв та заасан бол start_date одоогийн огноо ба schedule_interval - нэг өдөр, дараа нь DAG маргаашнаас эрт эхлэхгүй.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Тэгээд өөр асуудал байхгүй.

    Үүнтэй холбоотой өөр нэг ажиллах үеийн алдаа байна: Task is missing the start_date parameter, энэ нь таныг даг оператортой холбохоо мартсаныг илтгэдэг.

  • Бүгд нэг машин дээр. Тиймээ, суурь (Airflow өөрөө болон бидний бүрэх), вэб сервер, хуваарь, ажилчид. Тэгээд бүр ажилласан. Гэвч цаг хугацаа өнгөрөхөд үйлчилгээний даалгавруудын тоо нэмэгдэж, PostgreSQL нь индекст 20 мс биш 5 секундын дотор хариу өгч эхлэхэд бид үүнийг авч, авч явсан.
  • Local Executor. Тийм ээ, бид үүн дээр суусаар байгаа бөгөөд бид аль хэдийн ангалын ирмэг дээр ирчихсэн байна. LocalExecutor нь бидний хувьд хангалттай байсан, гэхдээ одоо дор хаяж нэг ажилчинтай болох цаг нь болсон бөгөөд бид CeleryExecutor руу шилжихийн тулд шаргуу ажиллах хэрэгтэй болно. Та түүнтэй нэг машин дээр ажиллах боломжтой тул Celery-г сервер дээр ч ашиглахад юу ч саад болохгүй, энэ нь "мэдээжийн хэрэг хэзээ ч үйлдвэрлэлд орохгүй!"
  • Ашиглахгүй суурилуулсан хэрэгслүүд:
    • холболтуудын үйлчилгээний итгэмжлэлийг хадгалах,
    • SLA хожигдсон Цаг хугацаанд нь хийгээгүй даалгаварт хариу өгөх,
    • xcom мета өгөгдөл солилцоход зориулагдсан (би хэлсэн зорилгоөгөгдөл!) dag ажлуудын хооронд.
  • Мэйл буруугаар ашиглах. За, би юу хэлэх вэ? Унасан даалгаврын бүх давталтын дохиог тохируулсан. Одоо миний ажлын Gmail-д Airflow-аас 90 мянга гаруй имэйл байгаа бөгөөд вэб шуудангийн хошуу нь нэг удаад 100 гаруйг авч устгахаас татгалзаж байна.

Илүү олон бэрхшээлүүд: Apache Airflow Pitfails

Илүү олон автоматжуулалтын хэрэгсэл

Биднийг гараараа бус толгойгоор илүү их ажиллахын тулд Airflow бидэнд дараах зүйлийг бэлдсэн.

  • REST API - Тэр туршилтын статустай хэвээр байгаа нь түүнийг ажиллахад саад болохгүй. Үүний тусламжтайгаар та зөвхөн даг болон даалгаврын талаар мэдээлэл авахаас гадна даг зогсоох/эхлүүлэх, DAG Run эсвэл усан сан үүсгэх боломжтой.
  • CLI - WebUI ашиглан ашиглахад тохиромжгүй олон хэрэгслийг тушаалын мөрөөр ашиглах боломжтой, гэхдээ ерөнхийдөө байхгүй. Жишээлбэл:
    • backfill даалгаврын тохиолдлуудыг дахин эхлүүлэх шаардлагатай.
      Жишээлбэл, шинжээчид ирээд: "Нөхөр та 1-р сарын 13-ээс XNUMX хүртэлх тоо баримтад дэмий юм байна! Үүнийг зас, зас, зас, зас, зас!" Та бол ийм тогоо юм:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Үндсэн үйлчилгээ: initdb, resetdb, upgradedb, checkdb.
    • run, энэ нь танд нэг жишээний ажлыг гүйцэтгэх, тэр ч байтугай бүх хамаарал дээр оноо авах боломжийг олгодог. Үүнээс гадна та үүнийг дамжуулан ажиллуулж болно LocalExecutor, хэрэв та Celery кластертай байсан ч гэсэн.
    • Бараг ижил зүйлийг хийдэг test, зөвхөн суурь дээр юу ч бичдэггүй.
    • connections бүрхүүлээс холболтыг бөөнөөр үүсгэх боломжийг олгодог.
  • python API - залгаасуудад зориулагдсан харилцан үйлчлэлийн нэлээн хатуу арга бөгөөд жижиг гараараа бөөгнөрөхгүй. Харин биднийг очиход хэн саад хийх юм /home/airflow/dags, ажиллуулах ipython тэгээд заваарч эхлэх үү? Жишээлбэл, та бүх холболтыг дараах кодоор экспортлох боломжтой.
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow мета мэдээллийн санд холбогдож байна. Би түүн рүү бичихийг зөвлөдөггүй, гэхдээ янз бүрийн тодорхой хэмжигдэхүүнүүдэд зориулсан даалгаврын төлөвийг авах нь API ашиглахаас хамаагүй хурдан бөгөөд хялбар байх болно.

    Бидний бүх ажил дутуу биш, гэхдээ заримдаа унадаг, энэ нь хэвийн зүйл гэж хэлье. Гэхдээ хэд хэдэн бөглөрөл нь аль хэдийн сэжигтэй байгаа тул шалгах шаардлагатай болно.

    SQL-ээс болгоомжил!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

лавлагаа

Мэдээжийн хэрэг, Google-ийн гаргасан эхний арван холбоос бол миний хавчуурга дээрх Airflow хавтасны агуулга юм.

Мөн нийтлэлд ашигласан холбоосууд:

Эх сурвалж: www.habr.com