Airflow是一个方便快速开发和维护批量数据处理流程的工具

Airflow是一个方便快速开发和维护批量数据处理流程的工具

你好,哈布尔! 在本文中,我想讨论一个用于开发批量数据处理流程的出色工具,例如在企业 DWH 或 DataLake 的基础设施中。 我们就来说说Apache Airflow(以下简称Airflow)。 Habré 被不公平地剥夺了关注,在主要部分中,我将尝试说服您,在为您的 ETL/ELT 流程选择调度程序时,至少 Airflow 值得关注。

此前,我在 Tinkoff 银行工作时写过一系列有关 DWH 主题的文章。 现在我已经成为 Mail.Ru Group 团队的一员,正在开发一个游戏领域的数据分析平台。 实际上,随着新闻和有趣的解决方案的出现,我和我的团队将在这里讨论我们的数据分析平台。

序幕

那么,让我们开始吧。 什么是气流? 这是一个图书馆(或 库集) 开发、规划和监控工作流程。 Airflow的主要特点:使用Python代码来描述(开发)流程。 这对于组织您的项目和开发有很多优势:本质上,您的(例如)ETL 项目只是一个 Python 项目,您可以按照您的意愿组织它,同时考虑基础设施的具体情况、团队规模和其他需求。 从工具上讲,一切都很简单。 例如使用 PyCharm + Git。 真是太棒了,而且非常方便!

现在让我们看看Airflow的主要实体。 通过了解它们的本质和目的,您可以最佳地组织您的流程架构。 或许最主要的实体就是有向无环图(以下简称DAG)。

DAG

DAG 是您想要根据特定时间表以严格定义的顺序完成的任务的一些有意义的关联。 Airflow 提供了一个方便的 Web 界面来与 DAG 和其他实体一起使用:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

DAG 可能如下所示:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

开发人员在设计 DAG 时,会制定一组运算符,在这些运算符上构建 DAG 内的任务。 这里我们来到另一个重要的实体:Airflow Operator。

运营商

操作员是创建作业实例的基础上的实体,它描述了作业实例执行期间将发生的情况。 Airflow 从 GitHub 发布 已经包含一组可供使用的运算符。 例子:

  • BashOperator - 用于执行 bash 命令的运算符。
  • PythonOperator - 用于调用 Python 代码的运算符。
  • EmailOperator — 发送电子邮件的运算符。
  • HTTPOperator - 用于处理 http 请求的运算符。
  • SqlOperator - 用于执行 SQL 代码的运算符。
  • 传感器是一个等待事件的操作符(所需时间的到来、所需文件的出现、数据库中的一行、API 的响应等)。

还有更具体的操作符:DockerOperator、HiveOperator、S3FileTransferOperator、PrestoToMysqlOperator、SlackOperator。

您还可以根据自己的特点开发算子并在项目中使用。 例如,我们创建了 MongoDBToHiveViaHdfsTransfer,一个用于将文档从 MongoDB 导出到 Hive 的运算符,以及几个用于使用的运算符 点击之家:CHLoadFromHiveOperator 和 CHTableLoaderOperator。 本质上,一旦项目中频繁使用基于基本语句构建的代码,您就可以考虑将其构建到新语句中。 这将简化进一步的开发,并且您将扩展项目中的运算符库。

接下来,所有这些任务实例都需要执行,现在我们将讨论调度程序。

调度器

Airflow 的任务调度程序构建于 芹菜。 Celery 是一个 Python 库,允许您组织队列以及任务的异步和分布式执行。 在 Airflow 方面,所有任务都被划分到池中。 池是手动创建的。 通常,它们的目的是限制使用源的工作量或典型化 DWH 中的任务。 池可以通过 Web 界面进行管理:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

每个池都有插槽数量限制。 创建 DAG 时,会为其指定一个池:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

在 DAG 级别定义的池可以在任务级别覆盖。
一个单独的进程 Scheduler 负责调度 Airflow 中的所有任务。 实际上,调度程序处理设置要执行的任务的所有机制。 该任务在执行之前会经历几个阶段:

  1. DAG 中之前的任务已完成;新的任务可以排队。
  2. 队列根据任务的优先级进行排序(优先级也可以控制),如果池中有空闲槽,则可以将任务投入运行。
  3. 如果有空闲的worker celery,则将任务发送给它; 您在问题中编写的工作开始,使用一个或另一个运算符。

很简单。

调度程序在所有 DAG 集合以及 DAG 内的所有任务上运行。

为了使 Scheduler 开始与 DAG 一起工作,DAG 需要设置一个时间表:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

有一组现成的预设: @once, @hourly, @daily, @weekly, @monthly, @yearly.

您还可以使用 cron 表达式:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

执行日期

要了解 Airflow 的工作原理,了解 DAG 的执行日期非常重要。 在 Airflow 中,DAG 有一个执行日期维度,即根据 DAG 的工作计划,为每个执行日期创建任务实例。 对于每个执行日期,可以重新执行任务 - 或者,例如,DAG 可以在多个执行日期同时工作。 这里清楚地显示了这一点:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

不幸的是(或者幸运的是:这取决于具体情况),如果 DAG 中任务的实施得到纠正,则前一个执行日期的执行将考虑调整后继续进行。 如果你需要使用新的算法重新计算过去一段时间的数据,这很好,但它很糟糕,因为结果的再现性丢失了(当然,没有人打扰你从 Git 返回所需版本的源代码并计算什么)您需要一次,以您需要的方式)。

生成任务

DAG 的实现是 Python 代码,因此我们有一个非常方便的方法来减少工作时的代码量,例如使用分片源。 假设您有三个 MySQL 分片作为源,您需要进入每个分片并获取一些数据。 而且,独立且并行。 DAG 中的 Python 代码可能如下所示:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG 看起来像这样:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

在这种情况下,您只需调整设置并更新 DAG 即可添加或删除分片。 舒服的!

您还可以使用更复杂的代码生成,例如,使用数据库形式的源或描述表结构、使用表的算法,并考虑 DWH 基础设施的功能,生成流程用于将 N 个表加载到您的存储中。 或者,例如,使用不支持使用列表形式的参数的 API,您可以从该列表在 DAG 中生成 N 个任务,将 API 中请求的并行度限制到一个池,然后抓取来自 API 的必要数据。 灵活的!

存储库

Airflow有自己的后端存储库,一个数据库(可以是MySQL或Postgres,我们有Postgres),它存储任务的状态,DAG,连接设置,全局变量等。在这里我想我可以说Airflow 中的存储库非常简单(大约 20 个表),如果您想在其上构建自己的任何流程,也很方便。 我记得Informatica存储库中有100500个表,在了解如何构建查询之前必须研究很长时间。

监控

鉴于存储库的简单性,您可以构建一个对您来说方便的任务监控流程。 我们在 Zeppelin 中使用记事本,在其中查看任务的状态:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

这也可能是 Airflow 本身的 Web 界面:

Airflow是一个方便快速开发和维护批量数据处理流程的工具

Airflow 代码是开源的,因此我们在 Telegram 中添加了警报。 如果发生错误,任务的每个运行实例都会向整个开发和支持团队所在的 Telegram 组发送垃圾邮件。

我们通过 Telegram 收到及时回复(如果需要),并通过 Zeppelin 我们收到 Airflow 中任务的整体情况。

在总

Airflow 主要是开源的,你不应该指望它能带来奇迹。 准备好投入时间和精力来构建有效的解决方案。 目标是可以实现的,相信我,这是值得的。 开发速度、灵活性、添加新流程的便捷性 - 您一定会喜欢的。 当然,您需要非常关注项目的组织、Airflow 本身的稳定性:奇迹不会发生。

现在我们每天都有 Airflow 工作 约 6,5 个任务。 他们的性格截然不同。 有从许多不同且非常具体的来源将数据加载到主 DWH 的任务,有在主 DWH 内计算店面的任务,有将数据发布到快速 DWH 的任务,有很多很多不同的任务 - 以及 Airflow日复一日地咀嚼它们。 用数字来说,这是 数千名2,3 DWH (Hadoop) 中不同复杂度的 ELT 任务,大约 2,5 个数据库 消息来源,这是一支来自 4名ETL开发人员,分为DWH中的ETL数据处理和DWH中的ELT数据处理,当然还有更多 一名管理员,谁负责服务的基础设施。

未来的计划

进程的数量不可避免地在增长,而我们在 Airflow 基础设施方面要做的主要事情是扩展。 我们想要构建一个 Airflow 集群,为 Celery 工作人员分配一对腿,并制作一个具有作业调度流程和存储库的自我复制头。

结语

当然,这并不是我想讲述的有关 Airflow 的全部内容,但我试图强调要点。 食欲是随着吃来的,尝试一下,你会喜欢的:)

来源: habr.com

添加评论