Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Xin chào, Habr! Trong bài viết này, tôi muốn nói về một công cụ tuyệt vời để phát triển các quy trình xử lý dữ liệu hàng loạt, chẳng hạn như trong cơ sở hạ tầng của DWH của công ty hoặc DataLake của bạn. Chúng ta sẽ nói về Apache Airflow (sau đây gọi là Airflow). Habré đã không được chú ý một cách không công bằng và trong phần chính, tôi sẽ cố gắng thuyết phục bạn rằng ít nhất Airflow cũng đáng xem xét khi chọn bộ lập lịch cho các quy trình ETL/ELT của bạn.

Trước đây, tôi đã viết một loạt bài về chủ đề DWH khi còn làm việc tại Ngân hàng Tinkoff. Bây giờ tôi đã trở thành thành viên của nhóm Mail.Ru Group và đang phát triển một nền tảng phân tích dữ liệu trong lĩnh vực trò chơi. Trên thực tế, khi tin tức và giải pháp thú vị xuất hiện, nhóm của tôi và tôi sẽ nói chuyện ở đây về nền tảng phân tích dữ liệu của chúng tôi.

Mở đầu

Vì vậy, hãy bắt đầu. Luồng không khí là gì? Đây là một thư viện (hoặc bộ thư viện) để phát triển, lập kế hoạch và giám sát các quy trình làm việc. Tính năng chính của Airflow: Mã Python được sử dụng để mô tả (phát triển) các quy trình. Điều này có rất nhiều lợi thế cho việc tổ chức dự án và quá trình phát triển của bạn: về bản chất, dự án ETL (ví dụ) của bạn chỉ là một dự án Python và bạn có thể tổ chức nó theo ý muốn, có tính đến các chi tiết cụ thể về cơ sở hạ tầng, quy mô nhóm và những yêu cầu khác. Về mặt công cụ, mọi thứ đều đơn giản. Sử dụng ví dụ PyCharm + Git. Thật tuyệt vời và rất tiện lợi!

Bây giờ chúng ta hãy xem các thực thể chính của Airflow. Bằng cách hiểu bản chất và mục đích của chúng, bạn có thể tổ chức kiến ​​trúc quy trình của mình một cách tối ưu. Có lẽ thực thể chính là Đồ thị chu kỳ có hướng (sau đây gọi là DAG).

DAG

DAG là một sự liên kết có ý nghĩa nào đó giữa các nhiệm vụ của bạn mà bạn muốn hoàn thành theo một trình tự được xác định nghiêm ngặt theo một lịch trình cụ thể. Airflow cung cấp giao diện web thuận tiện để làm việc với DAG và các thực thể khác:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

DAG có thể trông như thế này:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Nhà phát triển, khi thiết kế DAG, sẽ đặt ra một tập hợp các toán tử về các nhiệm vụ trong DAG sẽ được xây dựng. Ở đây chúng ta đến với một thực thể quan trọng khác: Người vận hành luồng không khí.

Người vận hành

Toán tử là một thực thể trên cơ sở các phiên bản công việc được tạo ra, nó mô tả những gì sẽ xảy ra trong quá trình thực thi một phiên bản công việc. Luồng khí phát hành từ GitHub đã chứa một tập hợp các toán tử sẵn sàng để sử dụng. Ví dụ:

  • BashOperator - toán tử để thực thi lệnh bash.
  • PythonOperator - toán tử để gọi mã Python.
  • EmailOperator - toán tử gửi email.
  • HTTPOperator - toán tử để làm việc với các yêu cầu http.
  • SqlOperator - toán tử để thực thi mã SQL.
  • Cảm biến là một toán tử để chờ đợi một sự kiện (sự xuất hiện của thời gian cần thiết, sự xuất hiện của tệp được yêu cầu, một dòng trong cơ sở dữ liệu, phản hồi từ API, v.v.).

Có nhiều toán tử cụ thể hơn: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Bạn cũng có thể phát triển các toán tử dựa trên đặc điểm riêng của mình và sử dụng chúng trong dự án của mình. Ví dụ: chúng tôi đã tạo MongoDBToHiveViaHdfsTransfer, một toán tử để xuất tài liệu từ MongoDB sang Hive và một số toán tử để làm việc với ClickNhà: CHLoadFromHiveOperator và CHTableLoaderOperator. Về cơ bản, ngay khi một dự án thường xuyên sử dụng mã được xây dựng trên các câu lệnh cơ bản, bạn có thể nghĩ đến việc xây dựng nó thành một câu lệnh mới. Điều này sẽ đơn giản hóa việc phát triển hơn nữa và bạn sẽ mở rộng thư viện toán tử của mình trong dự án.

Tiếp theo, tất cả các phiên bản nhiệm vụ này cần được thực thi và bây giờ chúng ta sẽ nói về bộ lập lịch.

Người lập kế hoạch

Bộ lập lịch tác vụ của Airflow được xây dựng trên Cần tây. Celery là một thư viện Python cho phép bạn sắp xếp hàng đợi cộng với việc thực thi các tác vụ không đồng bộ và phân tán. Về phía Airflow, tất cả các nhiệm vụ được chia thành các nhóm. Bể bơi được tạo thủ công. Thông thường, mục đích của chúng là giới hạn khối lượng công việc khi làm việc với nguồn hoặc để điển hình hóa các nhiệm vụ trong DWH. Bể bơi có thể được quản lý thông qua giao diện web:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Mỗi nhóm có giới hạn về số lượng vị trí. Khi tạo DAG, nó được cung cấp một nhóm:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Nhóm được xác định ở cấp DAG có thể bị ghi đè ở cấp nhiệm vụ.
Một quy trình riêng biệt, Trình lập lịch biểu, chịu trách nhiệm lên lịch cho tất cả các tác vụ trong Airflow. Trên thực tế, Trình lập lịch trình xử lý tất cả các cơ chế thiết lập nhiệm vụ để thực thi. Nhiệm vụ trải qua nhiều giai đoạn trước khi được thực thi:

  1. Các nhiệm vụ trước đó đã được hoàn thành trong DAG; một nhiệm vụ mới có thể được xếp hàng đợi.
  2. Hàng đợi được sắp xếp tùy thuộc vào mức độ ưu tiên của nhiệm vụ (mức độ ưu tiên cũng có thể được kiểm soát) và nếu có một vị trí trống trong nhóm, nhiệm vụ có thể được đưa vào hoạt động.
  3. Nếu có cần tây công nhân miễn phí, nhiệm vụ sẽ được gửi đến đó; công việc mà bạn đã lập trình trong bài toán bắt đầu bằng cách sử dụng toán tử này hoặc toán tử khác.

Đủ đơn giản.

Bộ lập lịch chạy trên tập hợp tất cả các DAG và tất cả các tác vụ trong DAG.

Để Scheduler bắt đầu làm việc với DAG, DAG cần đặt lịch:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Có một bộ cài đặt trước được tạo sẵn: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Bạn cũng có thể sử dụng biểu thức cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Ngày thi hành

Để hiểu cách Airflow hoạt động, điều quan trọng là phải hiểu Ngày thực hiện đối với DAG. Trong Airflow, DAG có thứ nguyên Ngày thực hiện, tức là tùy thuộc vào lịch làm việc của DAG, các phiên bản nhiệm vụ được tạo cho mỗi Ngày thực hiện. Và đối với mỗi Ngày thực hiện, các nhiệm vụ có thể được thực hiện lại - hoặc, ví dụ: DAG có thể hoạt động đồng thời trong một số Ngày thực hiện. Điều này được thể hiện rõ ràng ở đây:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Thật không may (hoặc có thể may mắn thay: tùy thuộc vào tình huống), nếu việc triển khai nhiệm vụ trong DAG được sửa chữa thì việc thực thi trong Ngày thực hiện trước đó sẽ tiếp tục có tính đến các điều chỉnh. Điều này tốt nếu bạn cần tính toán lại dữ liệu trong các giai đoạn trước bằng thuật toán mới, nhưng sẽ không tốt vì khả năng tái tạo của kết quả bị mất (tất nhiên, không ai làm phiền bạn trả lại phiên bản mã nguồn được yêu cầu từ Git và tính toán những gì bạn cần một lần, theo cách bạn cần).

Tạo nhiệm vụ

Việc triển khai DAG là mã bằng Python, vì vậy chúng tôi có một cách rất thuận tiện để giảm số lượng mã khi làm việc, chẳng hạn như với các nguồn phân đoạn. Giả sử bạn có ba phân đoạn MySQL làm nguồn, bạn cần truy cập vào từng phân đoạn và lấy một số dữ liệu. Hơn nữa, độc lập và song song. Mã Python trong DAG có thể trông như thế này:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG trông như thế này:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Trong trường hợp này, bạn có thể thêm hoặc xóa phân đoạn bằng cách điều chỉnh cài đặt và cập nhật DAG. Thoải mái!

Bạn cũng có thể sử dụng cách tạo mã phức tạp hơn, chẳng hạn như làm việc với các nguồn ở dạng cơ sở dữ liệu hoặc mô tả cấu trúc bảng, thuật toán để làm việc với bảng và, có tính đến các tính năng của cơ sở hạ tầng DWH, tạo ra một quy trình để tải N bảng vào bộ lưu trữ của bạn. Hoặc, ví dụ: làm việc với API không hỗ trợ làm việc với tham số ở dạng danh sách, bạn có thể tạo N tác vụ trong DAG từ danh sách này, hạn chế tính song song của các yêu cầu trong API thành một nhóm và loại bỏ dữ liệu cần thiết từ API. Linh hoạt!

kho

Airflow có kho lưu trữ phụ trợ riêng, một cơ sở dữ liệu (có thể là MySQL hoặc Postgres, chúng tôi có Postgres), lưu trữ trạng thái của các tác vụ, DAG, cài đặt kết nối, biến toàn cục, v.v., v.v. Ở đây tôi muốn tôi có thể nói rằng kho lưu trữ trong Airflow rất đơn giản (khoảng 20 bảng) và thuận tiện nếu bạn muốn xây dựng bất kỳ quy trình nào của riêng mình trên đó. Tôi nhớ 100500 bảng trong kho Informatica đã phải nghiên cứu rất lâu trước khi hiểu cách xây dựng truy vấn.

Giám sát

Do tính đơn giản của kho lưu trữ, bạn có thể xây dựng quy trình giám sát tác vụ thuận tiện cho mình. Chúng tôi sử dụng sổ ghi chú trong Zeppelin, nơi chúng tôi xem trạng thái của các nhiệm vụ:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Đây cũng có thể là giao diện web của Airflow:

Airflow là một công cụ giúp phát triển và duy trì các quy trình xử lý dữ liệu hàng loạt một cách thuận tiện và nhanh chóng

Mã Airflow là mã nguồn mở nên chúng tôi đã thêm cảnh báo vào Telegram. Mỗi phiên bản đang chạy của một tác vụ, nếu xảy ra lỗi, sẽ gửi thư rác vào nhóm trong Telegram, nơi bao gồm toàn bộ nhóm phát triển và hỗ trợ.

Chúng tôi nhận được phản hồi nhanh chóng thông qua Telegram (nếu được yêu cầu) và thông qua Zeppelin, chúng tôi nhận được bức tranh tổng thể về các nhiệm vụ trong Airflow.

trong tổng số

Luồng khí chủ yếu là nguồn mở và bạn không nên mong đợi điều kỳ diệu từ nó. Hãy chuẩn bị dành thời gian và công sức để xây dựng một giải pháp hiệu quả. Mục tiêu có thể đạt được, tin tôi đi, nó đáng giá. Tốc độ phát triển, tính linh hoạt, dễ dàng thêm các quy trình mới - bạn sẽ thích nó. Tất nhiên, bạn cần chú ý nhiều đến việc tổ chức dự án, tính ổn định của Airflow: phép màu không xảy ra.

Bây giờ chúng tôi có Airflow hoạt động hàng ngày khoảng 6,5 nghìn nhiệm vụ. Họ khá khác nhau về tính cách. Có nhiệm vụ tải dữ liệu vào DWH chính từ nhiều nguồn khác nhau và rất cụ thể, có nhiệm vụ tính toán mặt tiền cửa hàng bên trong DWH chính, có nhiệm vụ xuất dữ liệu vào DWH nhanh, có rất nhiều nhiệm vụ khác nhau - và Airflow nhai tất cả chúng ngày này qua ngày khác. Nói về số lượng thì đây là 2,3 nghìn Các tác vụ ELT có độ phức tạp khác nhau trong DWH (Hadoop), khoảng. 2,5 trăm cơ sở dữ liệu nguồn, đây là một đội từ 4 nhà phát triển ETL, được chia thành xử lý dữ liệu ETL trong xử lý dữ liệu DWH và ELT bên trong DWH và tất nhiên là hơn thế nữa một quản trị viên, người xử lý cơ sở hạ tầng của dịch vụ.

Kế hoạch cho tương lai

Số lượng quy trình chắc chắn sẽ tăng lên và điều chính chúng tôi sẽ làm về cơ sở hạ tầng Airflow là mở rộng quy mô. Chúng tôi muốn xây dựng một cụm Airflow, phân bổ một đôi chân cho công nhân Celery và tạo ra một cái đầu tự sao chép với các quy trình lập kế hoạch công việc và một kho lưu trữ.

Phần kết

Tất nhiên, đây không phải là tất cả những gì tôi muốn kể về Airflow, nhưng tôi đã cố gắng nêu bật những điểm chính. Ăn đi là thèm, hãy thử và bạn sẽ thích :)

Nguồn: www.habr.com

Thêm một lời nhận xét