Luồng khí Apache: Làm cho ETL dễ dàng hơn

Xin chào, tôi là Dmitry Logvinenko - Kỹ sư dữ liệu của Phòng phân tích thuộc nhóm các công ty Vezet.

Tôi sẽ cho bạn biết về một công cụ tuyệt vời để phát triển các quy trình ETL - Luồng khí Apache. Nhưng Luồng không khí rất linh hoạt và nhiều mặt nên bạn nên xem xét kỹ hơn ngay cả khi bạn không tham gia vào các luồng dữ liệu, nhưng có nhu cầu khởi chạy định kỳ bất kỳ quy trình nào và theo dõi quá trình thực hiện của chúng.

Và vâng, tôi sẽ không chỉ nói mà còn chỉ ra: chương trình có rất nhiều mã, ảnh chụp màn hình và đề xuất.

Luồng khí Apache: Làm cho ETL dễ dàng hơn
Những gì bạn thường thấy khi bạn google từ Airflow / Wikimedia Commons

Mục lục

Giới thiệu

Luồng khí Apache cũng giống như Django:

  • viết bằng trăn
  • có một bảng quản trị tuyệt vời,
  • có thể mở rộng vô thời hạn

- chỉ tốt hơn, và nó được tạo ra cho các mục đích hoàn toàn khác, cụ thể là (như nó được viết trước kat):

  • chạy và giám sát các tác vụ trên số lượng máy không giới hạn (càng nhiều Celery / Kubernetes và lương tâm của bạn sẽ cho phép bạn)
  • với việc tạo quy trình làm việc động từ mã Python rất dễ viết và dễ hiểu
  • và khả năng kết nối bất kỳ cơ sở dữ liệu và API nào với nhau bằng cách sử dụng cả các thành phần tạo sẵn và plugin tạo tại nhà (cực kỳ đơn giản).

Chúng tôi sử dụng Apache Airflow như thế này:

  • chúng tôi thu thập dữ liệu từ nhiều nguồn khác nhau (nhiều phiên bản SQL Server và PostgreSQL, nhiều API khác nhau với số liệu ứng dụng, thậm chí 1C) trong DWH và ODS (chúng tôi có Vertica và Clickhouse).
  • tiên tiến như thế nào cron, bắt đầu quá trình hợp nhất dữ liệu trên ODS, đồng thời giám sát việc bảo trì chúng.

Cho đến gần đây, nhu cầu của chúng tôi được đáp ứng bởi một máy chủ nhỏ có 32 lõi và 50 GB RAM. Trong Airflow, điều này hoạt động:

  • hơn 200 ngày (thực ra là quy trình công việc, trong đó chúng tôi nhồi nhét các tác vụ),
  • trong mỗi trung bình 70 nhiệm vụ,
  • sự tốt lành này bắt đầu (cũng ở mức trung bình) mỗi giờ một lần.

Và về cách chúng tôi mở rộng, tôi sẽ viết bên dưới, nhưng bây giờ hãy xác định vấn đề über mà chúng tôi sẽ giải quyết:

Có ba Máy chủ SQL ban đầu, mỗi máy có 50 cơ sở dữ liệu - tương ứng là các phiên bản của một dự án, chúng có cùng cấu trúc (hầu như ở mọi nơi, mua-ha-ha), có nghĩa là mỗi máy có một bảng Đơn hàng (may mắn thay, một bảng có bảng đó tên có thể được đẩy vào bất kỳ doanh nghiệp nào). Chúng tôi lấy dữ liệu bằng cách thêm các trường dịch vụ (máy chủ nguồn, cơ sở dữ liệu nguồn, ID tác vụ ETL) và ném chúng vào Vertica chẳng hạn.

Chúng ta hãy đi!

Phần chính, thực tế (và một chút lý thuyết)

Tại sao nó lại dành cho chúng tôi (và cho bạn)

Khi cây lớn và tôi đơn giản SQL-schik trong một cửa hàng bán lẻ ở Nga, chúng tôi đã đánh lừa các quy trình ETL hay còn gọi là luồng dữ liệu bằng hai công cụ có sẵn cho chúng tôi:

  • Trung tâm Điện lực Informatica - một hệ thống cực kỳ lan rộng, cực kỳ hiệu quả, với phần cứng riêng, phiên bản riêng. Tôi đã sử dụng God forbid 1% khả năng của nó. Tại sao? Chà, trước hết, giao diện này, đâu đó từ những năm 380, đã gây áp lực lên chúng tôi về mặt tinh thần. Thứ hai, thiết bị này được thiết kế cho các quy trình cực kỳ lạ mắt, tái sử dụng thành phần dữ dội và các thủ thuật doanh nghiệp rất quan trọng khác. Chúng tôi sẽ không nói gì về chi phí của nó, chẳng hạn như cánh của Airbus AXNUMX / năm.

    Coi chừng, một ảnh chụp màn hình có thể làm tổn thương những người dưới 30 tuổi một chút

    Luồng khí Apache: Làm cho ETL dễ dàng hơn

  • Máy chủ tích hợp máy chủ SQL - chúng tôi đã sử dụng đồng chí này trong các luồng nội bộ dự án của mình. Chà, trên thực tế: chúng tôi đã sử dụng SQL Server và sẽ không hợp lý nếu không sử dụng các công cụ ETL của nó. Mọi thứ trong đó đều tốt: cả giao diện đẹp và báo cáo tiến độ ... Nhưng đây không phải là lý do tại sao chúng tôi yêu thích các sản phẩm phần mềm, ồ, không phải vì điều này. phiên bản nó dtsx (là XML với các nút được xáo trộn khi lưu) chúng ta có thể, nhưng vấn đề là gì? Làm thế nào về việc tạo một gói nhiệm vụ sẽ kéo hàng trăm bảng từ máy chủ này sang máy chủ khác? Vâng, một trăm, ngón trỏ của bạn sẽ rơi ra từ hai mươi mảnh, nhấp vào nút chuột. Nhưng nó chắc chắn trông thời trang hơn:

    Luồng khí Apache: Làm cho ETL dễ dàng hơn

Chúng tôi chắc chắn đã tìm cách thoát ra. Trường hợp chẵn gần như đã đến với trình tạo gói SSIS tự viết ...

…và rồi một công việc mới tìm thấy tôi. Và Apache Airflow đã vượt qua tôi về điều đó.

Khi tôi phát hiện ra rằng các mô tả quy trình ETL là mã Python đơn giản, tôi đã không nhảy cẫng lên vì sung sướng. Đây là cách các luồng dữ liệu được tạo phiên bản và khác biệt, đồng thời đổ các bảng có cấu trúc duy nhất từ ​​​​hàng trăm cơ sở dữ liệu vào một mục tiêu đã trở thành vấn đề của mã Python trong một màn hình rưỡi hoặc hai màn hình 13 inch.

Lắp ráp cụm

Chúng ta đừng sắp xếp một trường mẫu giáo hoàn toàn và không nói về những điều hoàn toàn rõ ràng ở đây, chẳng hạn như cài đặt Luồng không khí, cơ sở dữ liệu bạn đã chọn, Cần tây và các trường hợp khác được mô tả trong bến cảng.

Để chúng tôi có thể ngay lập tức bắt đầu thí nghiệm, tôi đã phác thảo docker-compose.yml trong đó:

  • Hãy thực sự nâng cao Luồng khí: Bộ lập lịch, Máy chủ web. Flower cũng sẽ quay ở đó để theo dõi nhiệm vụ của Celery (vì nó đã được đẩy vào apache/airflow:1.10.10-python3.7, nhưng chúng tôi không phiền)
  • PostgreSQL, trong đó Airflow sẽ ghi thông tin dịch vụ của nó (dữ liệu lập lịch, thống kê thực thi, v.v.) và Celery sẽ đánh dấu các tác vụ đã hoàn thành;
  • Redis, sẽ hoạt động như một nhà môi giới nhiệm vụ cho Celery;
  • công nhân cần tây, sẽ tham gia vào việc thực hiện trực tiếp các nhiệm vụ.
  • Đến thư mục ./dags chúng tôi sẽ thêm các tệp của chúng tôi với mô tả về dags. Chúng sẽ được nhặt ngay lập tức, vì vậy không cần phải sắp xếp cả đống sau mỗi lần hắt hơi.

Ở một số nơi, mã trong các ví dụ không được hiển thị đầy đủ (để không làm lộn xộn văn bản), nhưng ở đâu đó, nó đã được sửa đổi trong quá trình này. Có thể tìm thấy các ví dụ mã làm việc hoàn chỉnh trong kho lưu trữ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Ghi chú:

  • Trong quá trình lắp ráp bố cục, tôi chủ yếu dựa vào hình ảnh nổi tiếng puckel/docker-luồng không khí - hãy chắc chắn để kiểm tra nó ra. Có lẽ bạn không cần bất cứ điều gì khác trong cuộc sống của bạn.
  • Tất cả các cài đặt Luồng không khí có sẵn không chỉ thông qua airflow.cfg, mà còn thông qua các biến môi trường (nhờ các nhà phát triển) mà tôi đã lợi dụng một cách ác ý.
  • Đương nhiên, nó chưa sẵn sàng để sản xuất: Tôi đã cố tình không dán nhịp tim vào các thùng chứa, tôi không bận tâm đến vấn đề bảo mật. Nhưng tôi đã làm điều tối thiểu phù hợp với những người thử nghiệm của chúng tôi.
  • Lưu ý rằng:
    • Thư mục dag phải có thể truy cập được đối với cả người lên lịch và người lao động.
    • Điều tương tự cũng áp dụng cho tất cả các thư viện của bên thứ ba - tất cả chúng phải được cài đặt trên các máy có bộ lập lịch và công nhân.

Chà, bây giờ thật đơn giản:

$ docker-compose up --scale worker=3

Sau khi mọi thứ ổn định, bạn có thể xem các giao diện web:

Khái niệm cơ bản

Nếu bạn không hiểu bất cứ điều gì trong tất cả các "dags" này, thì đây là một từ điển ngắn:

  • Scheduler - người chú quan trọng nhất trong Airflow, người kiểm soát việc robot làm việc chăm chỉ chứ không phải con người: theo dõi lịch trình, cập nhật dags, khởi chạy các nhiệm vụ.

    Nói chung, trong các phiên bản cũ hơn, anh ta gặp vấn đề với bộ nhớ (không, không phải mất trí nhớ mà là rò rỉ) và tham số kế thừa thậm chí vẫn còn trong cấu hình run_duration - khoảng thời gian khởi động lại của nó. Nhưng bây giờ mọi thứ đã ổn.

  • DAG (hay còn gọi là "dag") - "đồ thị tuần hoàn có hướng", nhưng định nghĩa như vậy sẽ ít người biết, nhưng trên thực tế, nó là nơi chứa các tác vụ tương tác với nhau (xem bên dưới) hoặc tương tự Gói trong SSIS và Quy trình làm việc trong Informatica .

    Ngoài dags, vẫn có thể có subdags, nhưng chúng tôi rất có thể sẽ không nhận được chúng.

  • DAG chạy - khởi tạo dag, được gán riêng execution_date. Các dagran của cùng một dag có thể hoạt động song song (tất nhiên nếu bạn đã thực hiện các nhiệm vụ của mình là idempotent).
  • Nhà điều hành là những đoạn mã chịu trách nhiệm thực hiện một hành động cụ thể. Có ba loại toán tử:
    • hoạt độngnhư yêu thích của chúng tôi PythonOperator, có thể thực thi bất kỳ mã Python (hợp lệ) nào;
    • chuyển, vận chuyển dữ liệu từ nơi này sang nơi khác, chẳng hạn như, MsSqlToHiveTransfer;
    • cảm biến mặt khác, nó sẽ cho phép bạn phản ứng hoặc làm chậm quá trình thực hiện tiếp theo của dag cho đến khi một sự kiện xảy ra. HttpSensor có thể kéo điểm cuối đã chỉ định và khi phản hồi mong muốn đang chờ, hãy bắt đầu chuyển GoogleCloudStorageToS3Operator. Một tâm trí tò mò sẽ hỏi: “tại sao? Rốt cuộc, bạn có thể thực hiện lặp lại ngay trong nhà điều hành! Và sau đó, để không làm tắc nghẽn nhóm nhiệm vụ với các toán tử bị treo. Cảm biến khởi động, kiểm tra và chết trước lần thử tiếp theo.
  • Nhiệm vụ - các toán tử được khai báo, bất kể loại nào và được gắn vào dag đều được thăng hạng nhiệm vụ.
  • ví dụ nhiệm vụ - khi người lập kế hoạch chung quyết định rằng đã đến lúc đưa các nhiệm vụ vào trận chiến với người biểu diễn (ngay tại chỗ, nếu chúng ta sử dụng LocalExecutor hoặc đến một nút từ xa trong trường hợp CeleryExecutor), nó gán một ngữ cảnh cho chúng (nghĩa là một tập hợp các biến - tham số thực thi), mở rộng các mẫu lệnh hoặc truy vấn và nhóm chúng.

Chúng tôi tạo ra các nhiệm vụ

Trước tiên, hãy phác thảo sơ đồ chung của doug của chúng tôi, sau đó chúng tôi sẽ đi sâu vào chi tiết hơn, bởi vì chúng tôi áp dụng một số giải pháp không tầm thường.

Vì vậy, ở dạng đơn giản nhất, một dag như vậy sẽ trông như thế này:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hãy tìm ra nó:

  • Đầu tiên, chúng tôi nhập các lib cần thiết và thứ gì khác;
  • sql_server_ds - List[namedtuple[str, str]] với tên của các kết nối từ Kết nối luồng không khí và cơ sở dữ liệu mà chúng tôi sẽ lấy đĩa của mình;
  • dag - thông báo về doug của chúng tôi, nhất thiết phải có trong globals(), nếu không Airflow sẽ không tìm thấy nó. Doug cũng cần phải nói:
    • Tên của anh ấy là gì orders - tên này sau đó sẽ xuất hiện trong giao diện web,
    • rằng anh ấy sẽ làm việc từ nửa đêm ngày XNUMX tháng XNUMX,
    • và nó sẽ chạy, khoảng 6 giờ một lần (đối với những người khó tính ở đây thay vì timedelta() có thể chấp nhận cron-đường kẻ 0 0 0/6 ? * * *, ít thú vị hơn - một biểu thức như @daily);
  • workflow() sẽ làm công việc chính, nhưng không phải bây giờ. Hiện tại, chúng tôi sẽ chỉ kết xuất ngữ cảnh của mình vào nhật ký.
  • Và bây giờ là phép thuật tạo nhiệm vụ đơn giản:
    • chúng tôi chạy qua các nguồn của chúng tôi;
    • khởi tạo PythonOperator, cái sẽ thực thi hình nộm của chúng ta workflow(). Đừng quên chỉ định một tên duy nhất (trong dag) của nhiệm vụ và tự buộc dag. Lá cờ provide_context đến lượt nó, sẽ đổ các đối số bổ sung vào hàm mà chúng tôi sẽ thu thập cẩn thận bằng cách sử dụng **context.

Hiện tại, đó là tất cả. Những gì chúng tôi nhận được:

  • dag mới trong giao diện web,
  • một trăm rưỡi tác vụ sẽ được thực hiện song song (nếu Luồng khí, cài đặt Cần tây và dung lượng máy chủ cho phép).

Vâng, gần như đã nhận được nó.

Luồng khí Apache: Làm cho ETL dễ dàng hơn
Ai sẽ cài đặt các phụ thuộc?

Để đơn giản hóa toàn bộ điều này, tôi bắt đầu docker-compose.yml Chế biến requirements.txt trên tất cả các nút.

Bây giờ nó đã biến mất:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Hình vuông màu xám là các trường hợp nhiệm vụ được xử lý bởi bộ lập lịch.

Chúng tôi đợi một chút, các nhiệm vụ được công nhân tóm tắt:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Tất nhiên, những chiếc màu xanh lá cây đã hoàn thành xuất sắc công việc của mình. Quỷ đỏ không mấy thành công.

Nhân tiện, không có thư mục nào trên sản phẩm của chúng tôi ./dags, không có sự đồng bộ hóa giữa các máy - tất cả các lỗi đều nằm ở git trên Gitlab của chúng tôi và Gitlab CI phân phối các bản cập nhật cho các máy khi hợp nhất vào master.

Đôi điều về Hoa

Trong khi các công nhân đang đập núm vú giả của chúng ta, hãy nhớ đến một công cụ khác có thể cho chúng ta thấy điều gì đó - Bông hoa.

Trang đầu tiên có thông tin tóm tắt về worker node:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Trang cường độ cao nhất với các tác vụ đã hoạt động:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Trang nhàm chán nhất với trạng thái của nhà môi giới của chúng tôi:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Trang sáng nhất là với biểu đồ trạng thái nhiệm vụ và thời gian thực hiện của chúng:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Chúng tôi tải quá tải

Vì vậy, tất cả các nhiệm vụ đã hoàn thành, bạn có thể mang những người bị thương đi.

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Và có nhiều người bị thương - vì lý do này hay lý do khác. Trong trường hợp sử dụng đúng Luồng không khí, chính những ô vuông này cho biết rằng dữ liệu chắc chắn không đến.

Bạn cần xem nhật ký và khởi động lại các trường hợp tác vụ bị lỗi.

Bằng cách nhấp vào bất kỳ hình vuông nào, chúng tôi sẽ thấy các hành động có sẵn cho chúng tôi:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Bạn có thể lấy và làm sạch những người đã ngã xuống. Nghĩa là, chúng ta quên rằng đã xảy ra lỗi ở đó và nhiệm vụ tương tự sẽ được chuyển đến bộ lập lịch.

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Rõ ràng là làm điều này với con chuột có tất cả các ô vuông màu đỏ là không nhân đạo cho lắm - đây không phải là điều chúng tôi mong đợi từ Airflow. Đương nhiên, chúng ta có vũ khí hủy diệt hàng loạt: Browse/Task Instances

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Hãy chọn mọi thứ cùng một lúc và đặt lại về XNUMX, nhấp vào mục chính xác:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Sau khi dọn dẹp, taxi của chúng tôi trông như thế này (họ đang đợi người lên lịch cho họ):

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Kết nối, móc và các biến khác

Đã đến lúc xem xét DAG tiếp theo, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Mọi người đã bao giờ thực hiện cập nhật báo cáo chưa? Đây lại là cô ấy: có một danh sách các nguồn lấy dữ liệu từ đâu; có một danh sách nơi để đặt; đừng quên bấm còi khi mọi thứ xảy ra hoặc hỏng hóc (tốt, đây không phải là về chúng tôi, không).

Hãy xem lại tệp và xem nội dung mới ít người biết:

  • from commons.operators import TelegramBotSendMessage - không có gì ngăn cản chúng tôi tạo các toán tử của riêng mình, điều mà chúng tôi đã tận dụng bằng cách tạo một trình bao bọc nhỏ để gửi tin nhắn đến Unblocked. (Chúng tôi sẽ nói thêm về toán tử này bên dưới);
  • default_args={} - dag có thể phân phối các đối số giống nhau cho tất cả các toán tử của nó;
  • to='{{ var.value.all_the_kings_men }}' - cánh đồng to chúng tôi sẽ không mã hóa cứng mà được tạo động bằng Jinja và một biến có danh sách email mà tôi đã cẩn thận đưa vào Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - điều kiện để khởi động người vận hành. Trong trường hợp của chúng tôi, bức thư sẽ chỉ đến tay các ông chủ nếu tất cả các phụ thuộc đã được giải quyết thành công;
  • tg_bot_conn_id='tg_main' - tranh luận conn_id chấp nhận ID kết nối mà chúng tôi tạo trong Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - tin nhắn trong Telegram sẽ chỉ bay đi nếu có nhiệm vụ bị rơi;
  • task_concurrency=1 - chúng tôi cấm khởi chạy đồng thời một số phiên bản tác vụ của một tác vụ. Nếu không, chúng tôi sẽ nhận được sự khởi chạy đồng thời của một số VerticaOperator (nhìn vào một cái bàn);
  • report_update >> [email, tg] - mọi điều VerticaOperator hội tụ trong việc gửi thư và tin nhắn, như thế này:
    Luồng khí Apache: Làm cho ETL dễ dàng hơn

    Nhưng vì các nhà khai thác trình thông báo có các điều kiện khởi chạy khác nhau nên chỉ có một điều kiện hoạt động. Trong Chế độ xem dạng cây, mọi thứ trông kém trực quan hơn một chút:
    Luồng khí Apache: Làm cho ETL dễ dàng hơn

tôi sẽ nói vài lời về macro và bạn bè của họ - biến.

Macro là trình giữ chỗ Jinja có thể thay thế nhiều thông tin hữu ích khác nhau thành các đối số toán tử. Ví dụ, như thế này:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} sẽ mở rộng đến nội dung của biến bối cảnh execution_date trong định dạng YYYY-MM-DD: 2020-07-14. Phần tốt nhất là các biến bối cảnh được đóng đinh vào một cá thể nhiệm vụ cụ thể (một hình vuông trong Chế độ xem dạng cây) và khi được khởi động lại, các trình giữ chỗ sẽ mở rộng thành các giá trị giống nhau.

Các giá trị được gán có thể được xem bằng cách sử dụng nút Kết xuất trên mỗi phiên bản tác vụ. Đây là cách thực hiện nhiệm vụ gửi thư:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Và như vậy trong nhiệm vụ gửi tin nhắn:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Danh sách đầy đủ các macro tích hợp sẵn cho phiên bản mới nhất có sẵn tại đây: tham chiếu macro

Hơn nữa, với sự trợ giúp của các plugin, chúng ta có thể khai báo các macro của riêng mình, nhưng đó lại là một câu chuyện khác.

Ngoài những thứ được xác định trước, chúng ta có thể thay thế các giá trị của các biến của mình (tôi đã sử dụng điều này trong đoạn mã trên). Hãy tạo trong Admin/Variables đôi điều:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Mọi thứ bạn có thể sử dụng:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Giá trị có thể là vô hướng hoặc cũng có thể là JSON. Trong trường hợp JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

chỉ cần sử dụng đường dẫn đến khóa mong muốn: {{ var.json.bot_config.bot.token }}.

Tôi thực sự sẽ nói một từ và hiển thị một ảnh chụp màn hình về kết nối. Mọi thứ đều sơ cấp ở đây: trên trang Admin/Connections chúng tôi tạo một kết nối, thêm thông tin đăng nhập / mật khẩu của chúng tôi và các thông số cụ thể hơn ở đó. Như thế này:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Mật khẩu có thể được mã hóa (kỹ lưỡng hơn mặc định) hoặc bạn có thể loại bỏ loại kết nối (như tôi đã làm cho tg_main) - thực tế là danh sách các loại được lập trình cố định trong các mô hình Luồng không khí và không thể mở rộng mà không truy cập vào mã nguồn (nếu đột nhiên tôi không tìm kiếm trên google một cái gì đó, vui lòng sửa cho tôi), nhưng sẽ không có gì ngăn cản chúng tôi nhận tín dụng chỉ bằng cách tên.

Bạn cũng có thể thực hiện một số kết nối có cùng tên: trong trường hợp này, phương thức BaseHook.get_connection(), giúp chúng tôi kết nối theo tên, sẽ cung cấp ngẫu nhiên từ một số trùng tên (sẽ hợp lý hơn nếu tạo Round Robin, nhưng hãy để nó theo lương tâm của các nhà phát triển Airflow).

Biến và Kết nối chắc chắn là những công cụ tuyệt vời, nhưng điều quan trọng là không làm mất sự cân bằng: bạn lưu trữ phần nào trong quy trình của mình trong chính mã và phần nào bạn cung cấp cho Luồng không khí để lưu trữ. Một mặt, có thể thuận tiện để nhanh chóng thay đổi giá trị, chẳng hạn như hộp thư, thông qua giao diện người dùng. Mặt khác, đây vẫn là sự quay trở lại với thao tác nhấp chuột mà chúng tôi (tôi) muốn loại bỏ.

Làm việc với các kết nối là một trong những nhiệm vụ móc. Nói chung, Airflow hooks là các điểm để kết nối nó với các dịch vụ và thư viện của bên thứ ba. Ví dụ, JiraHook sẽ mở một client để chúng ta tương tác với Jira (có thể di chuyển task qua lại), và với sự trợ giúp của SambaHook bạn có thể đẩy một tệp cục bộ vào smb-điểm.

Phân tích cú pháp toán tử tùy chỉnh

Và chúng tôi đã tiến gần đến việc xem nó được tạo ra như thế nào TelegramBotSendMessage

commons/operators.py với toán tử thực tế:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ở đây, giống như mọi thứ khác trong Airflow, mọi thứ đều rất đơn giản:

  • Thừa hưởng từ BaseOperator, thực hiện khá nhiều thứ dành riêng cho Luồng khí (hãy xem sự giải trí của bạn)
  • trường khai báo template_fields, trong đó Jinja sẽ tìm macro để xử lý.
  • Sắp xếp các lập luận đúng cho __init__(), đặt giá trị mặc định khi cần thiết.
  • Chúng tôi cũng không quên việc khởi tạo tổ tiên.
  • Đã mở hook tương ứng TelegramBotHooknhận được một đối tượng khách hàng từ nó.
  • Phương thức bị ghi đè (được xác định lại) BaseOperator.execute(), mà Airfow sẽ co giật khi đến lúc khởi chạy nhà điều hành - trong đó chúng tôi sẽ thực hiện hành động chính, quên đăng nhập. (Nhân tiện, chúng tôi đăng nhập, ngay trong stdout и stderr - Luồng không khí sẽ chặn mọi thứ, bao bọc đẹp đẽ, phân hủy khi cần thiết.)

Hãy xem những gì chúng ta có commons/hooks.py. Phần đầu tiên của tệp, với chính cái móc:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Tôi thậm chí không biết phải giải thích gì ở đây, tôi sẽ chỉ lưu ý những điểm quan trọng:

  • Chúng tôi kế thừa, suy nghĩ về các đối số - trong hầu hết các trường hợp, nó sẽ là một: conn_id;
  • Ghi đè các phương pháp tiêu chuẩn: Tôi giới hạn bản thân get_conn(), trong đó mình lấy thông số kết nối theo tên và chỉ lấy phần extra (đây là trường JSON), trong đó tôi (theo hướng dẫn của riêng tôi!) đặt mã thông báo bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • tôi tạo một phiên bản của chúng tôi TelegramBot, cung cấp cho nó một mã thông báo cụ thể.

Đó là tất cả. Bạn có thể lấy một khách hàng từ một cái móc bằng cách sử dụng TelegramBotHook().clent hoặc TelegramBotHook().get_conn().

Và phần thứ hai của tệp, trong đó tôi tạo một trình bao bọc vi mô cho API REST của Telegram, để không bị kéo giống nhau python-telegram-bot cho một phương pháp sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Cách chính xác là thêm tất cả lên: TelegramBotSendMessage, TelegramBotHook, TelegramBot - trong plugin, đặt vào kho lưu trữ công khai và cung cấp cho Nguồn mở.

Trong khi chúng tôi đang nghiên cứu tất cả những điều này, các bản cập nhật báo cáo của chúng tôi đã không thành công và gửi cho tôi một thông báo lỗi trong kênh. Em check xem có bị sao không...

Luồng khí Apache: Làm cho ETL dễ dàng hơn
Một cái gì đó đã phá vỡ trong doge của chúng tôi! Đó không phải là những gì chúng ta đã mong đợi? Chính xác!

Bạn sẽ đổ?

Bạn có cảm thấy tôi đã bỏ lỡ một cái gì đó? Hình như nó hứa chuyển dữ liệu từ SQL Server sang Vertica rồi nó lấy đi lạc chủ đề, thằng lưu manh!

Sự tàn bạo này là cố ý, tôi chỉ đơn giản là phải giải mã một số thuật ngữ cho bạn. Bây giờ bạn có thể đi xa hơn.

Kế hoạch của chúng tôi là thế này:

  1. làm dag
  2. Tạo nhiệm vụ
  3. Xem mọi thứ đẹp như thế nào
  4. Chỉ định số phiên để điền
  5. Lấy dữ liệu từ SQL Server
  6. Đưa dữ liệu vào Vertica
  7. Thu thập số liệu thống kê

Vì vậy, để thiết lập và chạy tất cả những thứ này, tôi đã thực hiện một bổ sung nhỏ cho docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ở đó chúng tôi nâng cao:

  • Vertica làm chủ nhà dwh với các cài đặt mặc định nhất,
  • ba phiên bản của SQL Server,
  • chúng tôi điền vào cơ sở dữ liệu sau một số dữ liệu (trong mọi trường hợp không xem xét mssql_init.py!)

Chúng tôi khởi chạy tất cả những điều tốt đẹp với sự trợ giúp của một lệnh phức tạp hơn một chút so với lần trước:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Những gì trình ngẫu nhiên kỳ diệu của chúng tôi tạo ra, bạn có thể sử dụng vật phẩm Data Profiling/Ad Hoc Query:

Luồng khí Apache: Làm cho ETL dễ dàng hơn
Điều chính là không hiển thị nó cho các nhà phân tích

xây dựng trên phiên ETL Tôi sẽ không, mọi thứ đều tầm thường ở đó: chúng tôi tạo một cơ sở, có một dấu hiệu trong đó, chúng tôi bọc mọi thứ bằng trình quản lý bối cảnh và bây giờ chúng tôi làm điều này:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

phiên.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Thời giờ đã đến thu thập dữ liệu của chúng tôi từ một trăm rưỡi bảng của chúng tôi. Hãy làm điều này với sự trợ giúp của các dòng rất khiêm tốn:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Với sự trợ giúp của một cái móc, chúng tôi nhận được từ Airflow pymssql-kết nối
  2. Hãy thay thế một hạn chế ở dạng ngày tháng vào yêu cầu - nó sẽ được công cụ mẫu đưa vào chức năng.
  3. Cho ăn yêu cầu của chúng tôi pandasai sẽ nhận được chúng tôi DataFrame - nó sẽ hữu ích cho chúng ta trong tương lai.

Tôi đang sử dụng thay thế {dt} thay vì một tham số yêu cầu %s không phải vì tôi là một Pinocchio độc ác, mà bởi vì pandas không thể xử lý pymssql và trượt cái cuối cùng params: Listmặc dù anh ấy rất muốn tuple.
Cũng lưu ý rằng nhà phát triển pymssql quyết định không hỗ trợ anh ta nữa, và đã đến lúc chuyển ra ngoài pyodbc.

Hãy xem những gì Airflow đã nhồi nhét các đối số của các chức năng của chúng ta:

Luồng khí Apache: Làm cho ETL dễ dàng hơn

Nếu không có dữ liệu, thì không có điểm nào để tiếp tục. Nhưng coi như điền thành công cũng là chuyện lạ. Nhưng đây không phải là một sai lầm. A-ah-ah, phải làm sao đây?! Và đây là những gì:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException sẽ cho Airflow biết rằng không có lỗi, nhưng chúng tôi bỏ qua tác vụ. Giao diện sẽ không có ô vuông xanh hay đỏ mà là màu hồng.

Hãy ném dữ liệu của chúng tôi nhiều cột:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Cụ thể

  • Cơ sở dữ liệu mà từ đó chúng tôi nhận đơn đặt hàng,
  • ID của phiên tràn ngập của chúng tôi (nó sẽ khác cho mọi nhiệm vụ),
  • Một hàm băm từ nguồn và ID đơn hàng - để trong cơ sở dữ liệu cuối cùng (nơi mọi thứ được đổ vào một bảng), chúng tôi có một ID đơn hàng duy nhất.

Bước áp chót vẫn còn: đổ mọi thứ vào Vertica. Và thật kỳ lạ, một trong những cách ngoạn mục và hiệu quả nhất để thực hiện điều này là thông qua CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Chúng tôi đang làm một máy thu đặc biệt StringIO.
  2. pandas sẽ vui lòng đặt của chúng tôi DataFrame ở dạng CSV-dòng.
  3. Hãy mở kết nối với Vertica yêu thích của chúng ta bằng một cái móc.
  4. Và bây giờ với sự giúp đỡ copy() gửi dữ liệu của chúng tôi trực tiếp đến Vertika!

Chúng tôi sẽ lấy từ trình điều khiển có bao nhiêu dòng đã được lấp đầy và thông báo cho người quản lý phiên rằng mọi thứ đều ổn:

session.loaded_rows = cursor.rowcount
session.successful = True

Đó là tất cả.

Khi bán, chúng tôi tạo bảng mục tiêu theo cách thủ công. Ở đây tôi cho phép mình một chiếc máy nhỏ:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

tôi đang dùng VerticaOperator() Tôi tạo một lược đồ cơ sở dữ liệu và một bảng (tất nhiên là nếu chúng chưa tồn tại). Điều chính là sắp xếp chính xác các phụ thuộc:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Tổng hợp

- Chà, - chú chuột nhỏ nói, - phải không, bây giờ
Bạn có tin rằng tôi là con vật khủng khiếp nhất trong rừng?

Julia Donaldson, Kẻ cộc cằn

Tôi nghĩ nếu tôi và các đồng nghiệp của mình có một cuộc thi: ai sẽ nhanh chóng tạo và khởi chạy một quy trình ETL từ đầu: họ với SSIS và một con chuột còn tôi với Airflow... Và sau đó chúng tôi cũng sẽ so sánh mức độ dễ bảo trì... Wow, tôi nghĩ bạn sẽ đồng ý rằng tôi sẽ đánh bại họ trên mọi mặt trận!

Nếu nghiêm túc hơn một chút, thì Apache Airflow - bằng cách mô tả các quy trình dưới dạng mã chương trình - đã hoàn thành công việc của tôi nhiều thoải mái và thú vị hơn.

Khả năng mở rộng không giới hạn của nó, cả về trình cắm và khả năng mở rộng, mang đến cho bạn cơ hội sử dụng Luồng không khí ở hầu hết mọi lĩnh vực: ngay cả trong toàn bộ chu trình thu thập, chuẩn bị và xử lý dữ liệu, ngay cả khi phóng tên lửa (lên sao Hỏa, khóa học).

Phần cuối cùng, tài liệu tham khảo và thông tin

Cào chúng tôi đã thu thập cho bạn

  • start_date. Vâng, đây đã là một meme địa phương. Thông qua lập luận chính của Doug start_date tất cả đều vượt qua. Tóm lại, nếu bạn chỉ định trong start_date ngày hiện tại, và schedule_interval - một ngày, sau đó DAG sẽ bắt đầu vào ngày mai không sớm hơn.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Và không còn vấn đề gì nữa.

    Có một lỗi thời gian chạy khác liên quan đến nó: Task is missing the start_date parameter, điều này thường chỉ ra rằng bạn quên liên kết với toán tử dag.

  • Tất cả trên một máy. Có, và các cơ sở (Bản thân luồng không khí và lớp phủ của chúng tôi), máy chủ web, bộ lập lịch trình và công nhân. Và nó thậm chí còn hoạt động. Nhưng theo thời gian, số lượng tác vụ cho các dịch vụ tăng lên và khi PostgreSQL bắt đầu phản hồi chỉ mục trong 20 giây thay vì 5 mili giây, chúng tôi đã lấy nó và mang nó đi.
  • LocalExecutor. Vâng, chúng tôi vẫn đang ngồi trên đó, và chúng tôi đã đến bờ vực thẳm. LocalExecutor cho đến nay là đủ đối với chúng tôi, nhưng giờ đã đến lúc mở rộng với ít nhất một worker và chúng tôi sẽ phải làm việc chăm chỉ để chuyển sang CeleryExecutor. Và do thực tế là bạn có thể làm việc với nó trên một máy, nên không có gì ngăn cản bạn sử dụng Celery ngay cả trên máy chủ, điều mà “tất nhiên, thành thật mà nói, sẽ không bao giờ được đưa vào sản xuất!”
  • không sử dụng công cụ tích hợp:
    • Kết nối để lưu trữ thông tin đăng nhập dịch vụ,
    • bỏ lỡ SLA để đáp ứng các nhiệm vụ không được thực hiện đúng hạn,
    • xcom để trao đổi siêu dữ liệu (tôi đã nói metadata!) giữa các tác vụ dag.
  • Lạm dụng thư. Vậy tôi có thể nói gì? Cảnh báo đã được thiết lập cho tất cả các lần lặp lại các nhiệm vụ bị lỗi. Bây giờ, Gmail công việc của tôi có hơn 90 nghìn email từ Airflow và mõm thư trên web từ chối nhận và xóa hơn 100 email cùng lúc.

Nhiều cạm bẫy hơn: Cạm bẫy luồng không khí Apache

Nhiều công cụ tự động hóa hơn

Để chúng tôi làm việc nhiều hơn bằng đầu chứ không phải bằng tay, Airflow đã chuẩn bị cho chúng tôi điều này:

  • REST API - anh ta vẫn có trạng thái Thí nghiệm, điều đó không ngăn cản anh ta làm việc. Với nó, bạn không chỉ có thể nhận thông tin về dag và tác vụ mà còn có thể dừng/bắt đầu dag, tạo DAG Run hoặc pool.
  • CLI - nhiều công cụ có sẵn thông qua dòng lệnh không chỉ gây bất tiện khi sử dụng thông qua WebUI mà còn không có. Ví dụ:
    • backfill cần thiết để khởi động lại các trường hợp nhiệm vụ.
      Ví dụ, các nhà phân tích đến và nói: “Còn đồng chí, dữ liệu từ ngày 1 đến ngày 13 tháng XNUMX thật vô lý! Sửa đi, sửa đi, sửa đi, sửa đi! Và bạn là một hob như vậy:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Dịch vụ cơ sở: initdb, resetdb, upgradedb, checkdb.
    • run, cho phép bạn chạy một tác vụ phiên bản và thậm chí ghi điểm trên tất cả các thành phần phụ thuộc. Hơn nữa, bạn có thể chạy nó qua LocalExecutor, ngay cả khi bạn có một cụm Cần tây.
    • Làm khá nhiều điều tương tự test, chỉ trong căn cứ không viết gì.
    • connections cho phép tạo hàng loạt các kết nối từ vỏ.
  • API Python - một cách tương tác khá khó khăn, dành cho các plugin chứ không phải nhúng tay vào nó. Nhưng ai sẽ ngăn cản chúng ta đi đến /home/airflow/dags, chạy ipython và bắt đầu lộn xộn xung quanh? Ví dụ: bạn có thể xuất tất cả các kết nối bằng mã sau:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Đang kết nối với siêu cơ sở dữ liệu Airflow. Tôi không khuyên bạn nên viết thư cho nó, nhưng nhận trạng thái tác vụ cho các số liệu cụ thể khác nhau có thể nhanh hơn và dễ dàng hơn nhiều so với thông qua bất kỳ API nào.

    Giả sử rằng không phải tất cả các nhiệm vụ của chúng tôi đều bình thường, nhưng đôi khi chúng có thể bị lỗi và điều này là bình thường. Nhưng một số tắc nghẽn đã đáng ngờ và cần phải kiểm tra.

    Coi chừng SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

tài liệu tham khảo

Và tất nhiên, mười liên kết đầu tiên từ việc phát hành Google là nội dung của thư mục Airflow từ dấu trang của tôi.

Và các liên kết được sử dụng trong bài viết:

Nguồn: www.habr.com