Apache Airflow:让 ETL 更简单

大家好,我是 Dmitry Logvinenko - Vezet 集团公司分析部门的数据工程师。

我将向您介绍一个用于开发 ETL 流程的出色工具 - Apache Airflow。 但 Airflow 的用途如此广泛且多面,即使您不参与数据流,但需要定期启动任何进程并监控其执行情况,您也应该仔细研究它。

是的,我不仅会告诉,还会展示:该程序有很多代码、屏幕截图和建议。

Apache Airflow:让 ETL 更简单
当你用 Google 搜索 Airflow / Wikimedia Commons 这个词时,你通常会看到什么

目录

介绍

Apache Airflow 就像 Django:

  • 用Python写的
  • 有一个优秀的管理员,
  • 无限扩展

- 只有更好,它是为了完全不同的目的而制作的,即(因为它写在 kat 之前):

  • 在无限数量的机器上启动和监控任务(尽可能多的 Celery/Kubernetes 和你的良心允许的数量)
  • 通过非常容易编写和理解的 Python 代码生成动态工作流程
  • 以及使用现成的组件和自制插件将任何数据库和 API 相互连接的能力(这非常简单)。

我们这样使用 Apache Airflow:

  • 我们从 DWH 和 ODS(我们有 Vertica 和 Clickhouse)中的各种来源(许多 SQL Server 和 PostgreSQL 实例、具有应用程序指标的各种 API,甚至 1C)收集数据。
  • 先进 cron,它启动 ODS 上的数据整合流程,并监控其维护。

直到最近,一台具有 32 个内核和 50 GB RAM 的小型服务器才能满足我们的需求。 在 Airflow 中,这是有效的:

  • 更多 200天 (实际上是工作流程,我们在其中填充任务),
  • 平均每个 70 项任务,
  • 这种美好开始了(也是平均而言) 每小时一次.

我将在下面写下我们如何扩展,但现在让我们定义我们将解决的超级任务:

有三个源 SQL Server,每个都有 50 个数据库 - 分别是一个项目的实例,它们具有相同的结构(几乎到处都是,哈哈),这意味着每个都有一个 Orders 表(幸运的是,有一个包含该表的表)名称可以推入任何业务)。 我们通过添加服务字段(源服务器、源数据库、ETL 任务 ID)来获取数据,然后天真地将它们扔到 Vertica 中。

走吧!

这部分是基础的、实用的(还有一点理论)

为什么我们(和你)

当树很大而我很单纯的时候 SQL-schik 在一家俄罗斯零售店中,我们使用两种可用的工具来欺骗 ETL 流程(又名数据流):

  • Informatica 电源中心 - 一个极其广泛的系统,极其高效,拥有自己的硬件,自己的版本控制。 我只使用了上帝保佑其能力的 1%。 为什么? 嗯,首先,这个 380 年代的界面给我们带来了精神上的压力。 其次,这个装置是为极其奇特的流程、激烈的组件重用和其他非常重要的企业技巧而设计的。 关于它的成本,例如空客 AXNUMX 的机翼/年,我们不会说什么。

    请注意,截图可能会对 30 岁以下的人造成一点伤害

    Apache Airflow:让 ETL 更简单

  • SQL Server 集成服务器 - 我们在内部项目流程中使用了这个人。 嗯,事实上:我们已经使用了 SQL Server,不使用它的 ETL 工具在某种程度上是不合理的。 它的一切都很好:界面漂亮,进度报告......但这不是我们喜欢软件产品的原因,哦,不是因为这个。 版本化 dtsx (这是保存时混合节点的 XML)我们可以,但是有什么意义呢? 制作一个任务包将一百个表从一台服务器拖到另一台服务器怎么样? 为什么,一百个、二十个会让你点击鼠标按钮时食指掉下来。 但它看起来确实更时尚:

    Apache Airflow:让 ETL 更简单

我们当然在寻找出路。 案例连 几乎 来到了一个自写的SSIS包生成器...

...然后我找到了一份新工作。 Apache Airflow 在这方面超越了我。

当我发现ETL流程描述都是简单的Python代码时,我简直高兴得手舞足蹈。 这就是数据流的版本控制和差异化方式,将具有单一结构的表从数百个数据库倒入一个目标中,只需在一个半或两个 13 英寸屏幕上编写 Python 代码即可。

组装集群

我们不要安排一个完全的幼儿园,也不要在这里讨论完全显而易见的事情,比如安装 Airflow、您选择的数据库、Celery 以及码头中描述的其他案例。

为了让我们可以立即开始实验,我勾勒出 docker-compose.yml 其中:

  • 我们来提高一下实际情况 气流:调度程序、网络服务器。 Flower 还将在那里运行以监视 Celery 任务(因为它已经被推送到 apache/airflow:1.10.10-python3.7,我们不介意);
  • PostgreSQL的,其中Airflow将写入其服务信息(调度程序数据、执行统计信息等),Celery将标记已完成的任务;
  • Redis的,他将充当 Celery 的任务代理;
  • 芹菜工人,谁将直接执行任务。
  • 到文件夹 ./dags 我们将添加包含 dags 描述的文件。 它们将被即时拾取,因此无需在每次打喷嚏后处理整个堆栈。

在某些地方,示例中的代码没有完整给出(以免文本混乱),并且在某些地方,代码在此过程中进行了修改。 完整的工作代码示例可以在存储库中找到 https://github.com/dm-logv/airflow-tutorial.

泊坞窗,compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

注:

  • 在构图的组装上,我很大程度上依赖于众所周知的图像 puckel/docker-气流 - 一定要检查一下。 也许你的生活中不需要任何其他东西。
  • 所有气流设置不仅可以通过 airflow.cfg,而且还通过环境变量(开发人员的荣耀),我恶意地利用了它。
  • 当然,它还没有做好生产准备:我故意没有在容器上放置心跳,也没有考虑安全性。 但我做了适合我们实验者的最低限度的事情。
  • 注意:
    • 调度程序和工作人员都应该可以访问带有 dags 的文件夹。
    • 这同样适用于所有第三方库 - 它们都必须安装在具有调度程序和工作人员的机器上。

好吧,现在很简单:

$ docker-compose up --scale worker=3

一切都升起来之后,可以看一下web界面:

基本概念

如果你不明白所有这些“dags”中的任何内容,那么这里有一本简短的字典:

  • 调度 - Airflow 中最重要的人,他确保机器人而不是人努力工作:他监控时间表、更新数据、运行任务。

    一般来说,在旧版本中,他有内存问题(不,不是健忘症,而是泄漏),遗留参数甚至保留在配置中 run_duration — 其重启间隔。 但现在一切都很好。

  • DAG (又名“dag”)是一个“有向无环图”,但这样的定义对任何人来说都没有什么意义,但本质上它是一个相互交互的任务的容器(见下文)或 SSIS 和 Workflow 中的 Package 的类似物在 Informatica 中。

    除了 dags 之外,可能仍然存在 subdags,但我们很可能不会接触到它们。

  • 有向无环运行 — 一个初始化的 dag,它被分配了自己的 execution_date。 同一 dag 的 Dagrans 可以并行工作(当然,如果您使任务具有幂等性)。
  • 操作者 - 这些是负责执行特定操作的代码片段。 运算符分为三种类型:
    • 行动,就像我们所爱的人 PythonOperator,它可以执行任何(有效的)Python 代码;
    • 转让,将数据从一个地方传输到另一个地方,比如说 MsSqlToHiveTransfer;
    • 传感器 它还允许您在任何事件发生之前做出反应或减慢 dag 的进一步执行。 HttpSensor 可以拉取指定的端点,当收到所需的响应时,开始传输 GoogleCloudStorageToS3Operator。 好奇的人会问:“为什么? 毕竟,您可以直接在操作符中进行重复操作!” 然后,以免操作员卡住而堵塞任务池。 传感器启动、测试并停止,直到下一次尝试。
  • 任务 — 声明的运算符,无论类型如何,并附加到 dag,都会提升到任务级别。
  • 任务实例 - 当总规划者决定是时候将任务发送给表演者工人时(就在现场,如果我们使用 LocalExecutor 或在以下情况下到远程节点 CeleryExecutor),它为它们分配一个上下文(即一组变量 - 执行参数),扩展命令或请求模板并将它们放入池中。

生成任务

首先,让我们概述一下 doug 的总体方案,然后我们将越来越深入地了解细节,因为我们应用了一些重要的解决方案。

因此,以最简单的形式,这样的 dag 将如下所示:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

让我们弄清楚:

  • 首先,导入所需的库并 别的东西;
  • sql_server_ds - List[namedtuple[str, str]] 包含来自 Airflow Connections 的连接名称以及我们将从中获取板块的数据库;
  • dag - 我们的公告,必须在 globals(),否则Airflow将找不到它。 道格还需要说:
    • 他叫什么名字 orders - 该名称随后将出现在网络界面中,
    • 他将从七月八日午夜开始工作,
    • 它应该运行一次,大约每 6 小时运行一次(对于这里的硬汉来说,而不是 timedelta() 允许的 cron-线 0 0 0/6 ? * * *,对于不太酷的人 - 像这样的表达 @daily);
  • workflow() 将完成主要工作,但不是现在。 现在我们只需将上下文转储到日志中。
  • 现在创建任务的简单魔力:
    • 我们浏览我们的资料来源;
    • 初始化 PythonOperator,这将执行我们的虚拟对象 workflow()。 不要忘记指定任务的唯一(在 dag 内)名称并绑定 dag 本身。 旗帜 provide_context 反过来,会将额外的参数倒入函数中,我们将使用这些参数仔细收集 **context.

现在就这样了。 我们得到了什么:

  • 网络界面中的新 dag,
  • 将并行执行一百个任务(如果 Airflow、Celery 设置和服务器容量允许)。

嗯,差不多明白了。

Apache Airflow:让 ETL 更简单
谁来安装依赖项?

为了简化整个事情,我搞砸了 docker-compose.yml 处理 requirements.txt 在所有节点上。

现在我们开始吧:

Apache Airflow:让 ETL 更简单

灰色方块是调度程序处理的任务实例。

我们稍等一下,任务就被工人抢走了:

Apache Airflow:让 ETL 更简单

当然,绿色的已经成功完成了他们的工作。 红军并不是很成功。

顺便说一句,我们的产品上没有文件夹 ./dags,机器之间没有同步 - 所有 dags 都位于 git 在我们的 Gitlab 上,Gitlab CI 在合并时将更新分发到机器 master.

关于花的一些事

当工人们在敲打我们的奶嘴时,让我们记住另一个可以向我们展示一些东西的工具——花。

第一页包含工作节点的摘要信息:

Apache Airflow:让 ETL 更简单

发送工作任务最饱和的页面:

Apache Airflow:让 ETL 更简单

最无聊的页面显示我们经纪人的状态:

Apache Airflow:让 ETL 更简单

最亮的页面是任务状态图及其执行时间:

Apache Airflow:让 ETL 更简单

我们加载未加载的

至此,所有的任务已经完成,伤员可以被抬走了。

Apache Airflow:让 ETL 更简单

由于这样或那样的原因,还有许多人受伤。 在正确使用Airflow的情况下,这些方块表明数据肯定没有到达。

您需要查看日志并重新启动失败的任务实例。

通过单击任何方块,我们将看到可用的操作:

Apache Airflow:让 ETL 更简单

你可以带走并清理掉倒下的人。 也就是说,我们忘记了那里发生了一些故障,并且相同的实例任务将转到调度程序。

Apache Airflow:让 ETL 更简单

显然,用带有所有红色方块的鼠标来执行此操作不太人性化 - 这不是我们对 Airflow 的期望。 当然,我们有大规模杀伤性武器: Browse/Task Instances

Apache Airflow:让 ETL 更简单

让我们一次选择所有内容并将其重置为零,单击正确的项目:

Apache Airflow:让 ETL 更简单

清洁后,我们的出租车看起来像这样(它们已经在等待调度程序调度它们):

Apache Airflow:让 ETL 更简单

连接、挂钩和其他变量

是时候看看下一个 DAG 了, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

大家都做过报告更新吗? 这又是她:有一个获取数据的来源列表; 有一个清单可以放在哪里; 当一切发生或损坏时,不要忘记按喇叭(好吧,这与我们无关,不)。

让我们再次浏览一下该文件,看看新的晦涩难懂的东西:

  • from commons.operators import TelegramBotSendMessage - 没有什么可以阻止我们创建自己的运算符,我们通过制作一个小型包装器来向 Unblocked 发送消息来利用它。 (我们将在下面详细讨论这个运算符);
  • default_args={} — dag 可以将相同的参数分配给它的所有操作员;
  • to='{{ var.value.all_the_kings_men }}' - 场地 to 我们不会进行硬编码,而是使用 Jinja 和带有电子邮件列表的变量动态生成,我小心地将其放入 Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — 运营商启动条件。 在我们的例子中,只有满足所有依赖关系,这封信才会发送给老板 顺利;
  • tg_bot_conn_id='tg_main' - 论点 conn_id 接受我们创建的连接的标识符 Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED — 只有当任务掉落时,Telegram 中的消息才会飞走;
  • task_concurrency=1 — 我们禁止同时启动一项任务的多个任务实例。 否则,我们将同时启动多个 VerticaOperator (看着一张桌子);
  • report_update >> [email, tg] -全部 VerticaOperator 集中发送信件和消息,如下所示:
    Apache Airflow:让 ETL 更简单

    但由于通知运营商的启动条件不同,因此只有一种可以工作。 在树视图中,一切看起来都不太清晰:
    Apache Airflow:让 ETL 更简单

我简单说几句 和他们的朋友—— 变量.

宏是 Jinja 占位符,可以将各种有用的信息替换为运算符参数。 例如,像这样:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} 将扩展到上下文变量的内容 execution_date 格式 YYYY-MM-DD: 2020-07-14。 最好的部分是上下文变量被固定到特定的任务实例(树视图中的一个正方形),并且当重新启动时,占位符将扩展为相同的值。

可以使用每个任务实例上的“渲染”按钮查看分配的值。 发送信件的任务如下所示:

Apache Airflow:让 ETL 更简单

因此,在发送消息的任务中:

Apache Airflow:让 ETL 更简单

此处提供了最新可用版本的内置宏的完整列表: 宏参考

此外,借助插件,我们可以声明自己的宏,但那是另一回事了。

除了预定义的东西之外,我们还可以替换变量的值(我已经在上面的代码中使用了它)。 让我们创建在 Admin/Variables 有几件事:

Apache Airflow:让 ETL 更简单

就是这样,您可以使用:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

该值可以是标量,也可以是 JSON。 如果是 JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

只需使用所需密钥的路径: {{ var.json.bot_config.bot.token }}.

我会逐字逐句地说一个词并展示一张屏幕截图 连接。 这里的一切都是基本的:在页面上 Admin/Connections 我们创建一个连接,在其中添加我们的登录名/密码和更具体的参数。 像这样:

Apache Airflow:让 ETL 更简单

密码可以加密(比默认设置更彻底),或者您可以省略连接类型(就像我所做的那样) tg_main) - 事实上,类型列表是硬连线到 Airflow 模型中的,如果不进入源代码就无法扩展(如果我突然没有用 Google 搜索某些内容,请纠正我),但没有什么可以阻止我们简单地获得积分姓名。

您还可以使用相同的名称建立多个连接:在这种情况下,该方法 BaseHook.get_connection(),它通过名称为我们提供联系,将给出 随机的 来自几个同名的人(进行循环会更合乎逻辑,但让我们把它留给 Airflow 开发人员的良心)。

变量和连接当然是很酷的工具,但重要的是不要失去将流的哪些部分存储在代码中以及将哪些部分提供给 Airflow 进行存储的平衡。 一方面,可以通过UI方便地快速更改某个值,例如邮箱。 另一方面,这仍然是对鼠标点击的回归,我们(我)想要摆脱它。

使用连接是任务之一 钩子。 一般来说,Airflow hooks 是将其连接到第三方服务和库的点。 例如, JiraHook 将为我们打开一个客户端来与 Jira 交互(您可以来回移动任务),并在帮助下 SambaHook 您可以将本地文件推送到 smb-观点。

解析自定义运算符

我们差点就看到它是如何制作的 TelegramBotSendMessage

代码 commons/operators.py 与实际操作员:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

在这里,就像 Airflow 中的其他所有内容一样,一切都非常简单:

  • 继承自 BaseOperator,它实现了很多特定于 Airflow 的东西(闲暇时查看一下)
  • 宣布领域 template_fields,其中 Jinja 将查找要处理的宏。
  • 安排了正确的论据 __init__(),在必要时放置默认值。
  • 他们也没有忘记初始化祖先。
  • 打开对应的hook TelegramBotHook,从中收到一个客户端对象。
  • 重写(重新定义)方法 BaseOperator.execute(),当启动操作员时,Airfow 会抽动 - 在其中我们执行主要操作,而不会忘记登录。 (顺便说一句,我们直接登录 stdout и stderr - 气流会拦截一切,将其完美包裹,并在必要时将其分解。)

让我们看看我们有什么 commons/hooks.py。 文件的第一部分,带有钩子本身:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

我什至不知道要解释什么,我只记下重点:

  • 我们继承并思考论点 - 在大多数情况下它将是一个: conn_id;
  • 超越标准方法:我限制了自己 get_conn(),其中我按名称获取连接参数并仅获取该部分 extra (这是一个 JSON 字段),我(根据我自己的指示!)在其中放入 Telegram 机器人令牌: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • 我创建了一个我们的实例 TelegramBot,给它一个特定的标记。

就这样。 您可以使用以下方式从钩子中获取客户端 TelegramBotHook().clent или TelegramBotHook().get_conn().

文件的第二部分,我为 Telegram REST API 制作了一个微包装器,以免拖拽相同的内容 python-telegram-bot 对于一种方法 sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

正确的方法是将其全部相加: TelegramBotSendMessage, TelegramBotHook, TelegramBot - 放入插件中,将其放入公共存储库中,然后将其提供给开源。

当我们研究这一切时,我们的报告更新成功失败,并向我的频道发送了一条错误消息。 我再去看看有什么问题...

Apache Airflow:让 ETL 更简单
我们的东西坏了! 这不是我们一直在等待的吗? 确切地!

你要倒吗?

你有没有觉得我错过了什么? 看来他答应将数据从SQL Server转移到Vertica,然后他就拿走了并转移话题,流氓!

这次犯罪是故意的,我只是要为你破译一些术语。 现在你可以走得更远。

我们的计划是这样的:

  1. 做达格
  2. 生成任务
  3. 看看一切多么美丽
  4. 为填充分配会话编号
  5. 从 SQL Server 获取数据
  6. 将数据放入Vertica
  7. 收集统计数据

因此,为了让这一切正常运行,我对我们的 docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

在那里我们提出:

  • Vertica 作为主机 dwh 使用最默认的设置,
  • SQL Server 的三个实例,
  • 我们用一些数据填充后者的数据库(在任何情况下都不要查看 mssql_init.py!)

我们使用比上次稍微复杂的命令来启动所有内容:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

我们的奇迹随机发生器生成的内容可以使用该项目完成 Data Profiling/Ad Hoc Query:

Apache Airflow:让 ETL 更简单
最主要的是不要向分析师展示

详细说明 ETL 会话 我不会,那里的一切都很琐碎:我们制作一个基础,里面有一个标志,我们用上下文管理器包装所有内容,现在我们这样做:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

会话文件

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

时机已到 收集我们的数据 从我们的一百五十张桌子中。 让我们借助非常朴实的台词来做到这一点:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. 使用我们从 Airflow 获得的钩子 pymssql-连接
  2. 让我们将日期形式的约束插入到请求中 - 模板引擎会将其放入函数中。
  3. 满足我们的要求 pandas谁会得到我们 DataFrame - 这对我们将来会有用。

我正在使用替换 {dt} 而不是请求参数 %s 不是因为我是邪恶的匹诺曹,而是因为 pandas 无法应付 pymssql 并滑落最后一个 params: List虽然他真的想要 tuple.
另请注意,开发商 pymssql 决定不再支持他,是时候搬出去了 pyodbc.

让我们看看 Airflow 在函数的参数中填充了什么:

Apache Airflow:让 ETL 更简单

如果没有数据,那么就没有继续下去的意义。 但认为填充成功也很奇怪。 但这并不是一个错误。 啊啊啊啊,怎么办?! 这是什么:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow 会告诉您实际上没有错误,但我们正在跳过该任务。 界面不会有绿色或红色方块,而是粉红色。

让我们扔掉我们的数据 几列:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

这些是:

  • 我们从中接受订单的数据库,
  • 我们的上传会话的 ID(会有所不同) 对于每项任务),
  • 来自源和订单 ID 的哈希值 - 这样在最终数据库(所有内容都倒入一个表中)我们就有一个唯一的订单 ID。

倒数第二步仍然是:将所有内容倒入 Vertica。 而且,奇怪的是,最引人注目、最有效的方法之一就是通过 CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. 我们正在制作一个特殊的接收器 StringIO.
  2. pandas 请把我们的放进去 DataFrameCSV- 线。
  3. 让我们用钩子打开与我们最喜欢的 Vertica 的连接。
  4. 现在在帮助下 copy() 将我们的数据直接发送到 Vertika!

我们将从驱动程序中获取已填充的行数,并告诉会话管理器一切正常:

session.loaded_rows = cursor.rowcount
session.successful = True

这就是全部。

在销售时,我们手动创建目标板。 在这里我给自己准备了一台小机器:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

我在用 VerticaOperator() 我创建一个数据库模式和一个表(当然,如果它们尚不存在)。 最主要的是正确安排依赖关系:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

总结

- 嗯, - 小老鼠说, - 不是吗?
你确信我是森林里最可怕的动物吗?

朱莉娅·唐纳森《咕噜牛》

我想如果我和我的同事进行一场竞赛:谁会从头开始快速创建并启动 ETL 流程:他们使用 SSIS 和鼠标,而我使用 Airflow...然后我们还会比较维护的难易程度...哇,我想你会同意我会在所有方面击败他们!

如果更认真一点的话,那么 Apache Airflow - 通过以程序代码的形式描述流程 - 完成了我的工作 更方便、更愉快。

其无限的可扩展性:无论是在插件方面还是在可扩展性方面 - 让您有机会在几乎任何领域使用 Airflow:甚至在收集、准备和处理数据的整个周期中,甚至在发射火箭(当然是到火星)时。

最后部分、参考和信息

我们为您收集的耙子

  • start_date。 是的,这已经是当地的模因了。 通过道格的主要论点 start_date 每个人都通过。 简而言之,如果您指定 start_date 当前日期,以及 schedule_interval - 总有一天,DAG 将不早于明天启动。
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    再也没有问题了。

    还有另一个与之相关的运行时错误: Task is missing the start_date parameter,这通常表明您忘记绑定到 dag 运算符。

  • 一切都在一台机器上。 是的,还有数据库(Airflow 本身和我们的涂层)、网络服务器、调度程序和工作人员。 它甚至起作用了。 但随着时间的推移,服务的任务数量不断增加,当 PostgreSQL 开始在 20 秒而不是 5 毫秒内响应索引时,我们就把它带走了。
  • 本地执行器。 是的,我们还坐在上面,我们已经来到了深渊的边缘。 到目前为止,LocalExecutor 对我们来说已经足够了,但现在是时候扩展至少一个工作线程了,我们必须更加努力地迁移到 CeleryExecutor。 而且由于您可以在一台机器上使用它,因此即使在服务器上也没有什么可以阻止您使用 Celery,“老实说,服务器自然永远不会投入生产!”
  • 不使用 内置工具:
    • 连接 存储服务凭证,
    • SLA 未命中 响应未能按时完成的任务,
    • 星康 交换元数据(我说 数据!)在 dag 任务之间。
  • 邮件滥用。 好吧,我能说什么? 为所有重复的失败任务设置了警报。 现在我的工作 Gmail 有超过 90 万封来自 Airflow 的电子邮件,而网络邮件枪口拒绝一次接收和删除超过 100 封电子邮件。

更多陷阱: Apache Airflow 的缺陷

实现更高程度自动化的手段

为了让我们更多地用头脑而不是用手工作,Airflow 为我们准备了以下内容:

  • REST API - 它仍然处于实验状态,这并不妨碍它工作。 在它的帮助下,您不仅可以接收有关 dag 和任务的信息,还可以停止/启动 dag、创建 DAG 运行或池。
  • CLI — 许多工具可以通过命令行使用,但通过 WebUI 使用起来不仅不方便,而且完全不存在。 例如:
    • backfill 需要重新启动任务实例。
      比如,分析师来了,说:“同志,你1月13日到XNUMX日的数据都是胡说八道! 修好,修好,修好,修好!” 而你就是这样一个滚刀:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • 基础服务: initdb, resetdb, upgradedb, checkdb.
    • run,它允许您启动一个任务实例,甚至忘记所有依赖项。 此外,您可以通过以下方式运行它 LocalExecutor,即使您有一个 Celery 集群。
    • 做几乎相同的事情 test,也只是在基地中什么也不写。
    • connections 允许从 shell 批量创建连接。
  • Python API - 一种相当核心的交互方式,适用于插件,而不是用小手蜂拥而至。 但谁能阻止我们去 /home/airflow/dags, 跑步 ipython 并开始胡闹? 例如,您可以使用以下代码导出所有连接:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • 连接到 Airflow 元数据数据库。 我不建议写入它,但您可以比通过任何 API 更快更轻松地获取各种特定指标的任务状态。

    假设并非所有任务都是幂等的,但它们有时会失败,这是正常的。 但一些瓦砾已经很可疑,我们需要检查一下。

    当心,SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

引用

当然,来自 Google 的前十个链接是我书签中 Airflow 文件夹的内容。

以及文章中使用的链接:

来源: habr.com