《浮士德》的背景任务,第一部分:简介

《浮士德》的背景任务,第一部分:简介

我怎么就过这样的生活呢?

不久前,我要从事一个高负载项目的后端工作,其中需要组织大量后台任务的定期执行,这些任务具有复杂的计算和对第三方服务的请求。 该项目是异步的,在我来之前,它有一个用于 cron 启动任务的简单机制:一个循环检查当前时间并通过收集启动协程组 - 这种方法被证明是可以接受的,直到有数十个和数百个这样的协程然而,当他们的数量超过两千时,我不得不考虑组织一个正常的任务队列,有一个broker,几个worker等等。

首先,我决定尝试一下我以前用过的 Celery。 由于该项目的异步性质,我深入研究了这个问题并看到 文章以及 项目,由文章作者创建。

我想说的是,这个项目非常有趣,并且在我们团队的其他应用程序中运行得相当成功,而且作者本人表示,他能够通过使用异步池将其投入生产。 但不幸的是,事实证明它并不适合我 问题 通过小组启动任务(参见。 )。 在撰写本文时 问题 已经关闭了,但是工作已经进行了一个月。 无论如何,祝作者好运,一切顺利,因为库上已经有了可用的东西……总的来说,重点在于我,而这个工具对我来说很潮湿。 此外,某些任务对不同的服务有 2-3 个 http 请求,因此即使在优化任务时,我们也会创建 4 个 TCP 连接,大约每 2 小时一次 - 不太好......我想为一种类型的任务创建一个会话启动工人时的任务。 更多关于 aiohttp 的大量请求的信息 这里.

对此,我开始寻找 备择方案 并找到了! 芹菜的创造者,具体来说,据我所知 询问索勒姆,被创建 浮士德,最初用于该项目 robinhood。 Faust 受到 Kafka Streams 的启发,使用 Kafka 作为代理,rocksdb 也用于存储代理工作的结果,最重要的是该库是异步的。

另外,你还可以看看 快速比较 芹菜和浮士德来自后者的创造者:它们的差异,经纪人之间的差异,基本任务的实施。 一切都很简单,然而,faust 中的一个很好的功能引起了人们的注意——用于传输到主题的类型化数据。

我们要做什么?

因此,在一系列简短的文章中,我将向您展示如何使用 Faust 从后台任务收集数据。 顾名思义,我们示例项目的源代码是: 阿尔法优势公司。 我将演示如何编写代理(接收器、主题、分区)、如何定期 (cron) 执行、最方便的 faust cli 命令(单击的包装器)、简单的集群,最后我们将附加一个 datadog (开箱即用)并尝试一些东西来看看。 为了存储收集到的数据,我们将使用 mongodb 和 motor 进行连接。

PS 从关于监控的观点写的信心来看,我认为上一篇文章结尾处的读者仍然会是这样的:

《浮士德》的背景任务,第一部分:简介

项目要求

由于我已经承诺过,让我们列出一个服务应该能够执行的操作的小清单:

  1. 定期上传证券及其概览(包括去年的损益表、资产负债表、现金流量)
  2. 上传历史数据(针对每个交易年度,查找交易收盘价的极值)-定期
  3. 定期上传最新交易数据
  4. 定期上传每个证券的定制指标列表

正如预期的那样,我们从头开始为项目选择一个名称: 霍顿

我们正在准备基础设施

这个标题确实很强大,但是,您需要做的就是为 docker-compose 编写一个小配置,其中包含 kafka(和 Zookeeper - 在一个容器中)、kafdrop(如果我们想查看主题中的消息)、mongodb。 我们得到 [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml)的形式如下:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

这里根本没有什么复杂的。 为 kafka 声明了两个监听器:一个(内部)用于复合网络内部,第二个(外部)用于来自外部的请求,因此它们将其转发到外部。 2181 — 动物园管理员端口。 我想,剩下的事情就很清楚了。

准备项目的框架

在基本版本中,我们项目的结构应该如下所示:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*我注意到的一切 我们还没有触及它,我们只是创建空文件。**

我们创建了一个结构。 现在让我们添加必要的依赖项,编写配置并连接到 mongodb。 我不会提供文章中文件的全文,以免耽误时间,但我会提供必要版本的链接。

让我们从项目的依赖项和元数据开始 - pyproject.toml

接下来,我们开始安装依赖项并创建 virtualenv (或者您可以自己创建 venv 文件夹并激活环境):

pip3 install poetry (если ещё не установлено)
poetry install

现在让我们创建 配置文件 - 凭证和敲门的地方。 您可以立即将 alphavantage 的数据放在那里。 好吧,让我们继续 配置文件 — 从我们的配置中提取应用程序的数据。 是的,我承认,我使用了我的库 - 西迪.

连接到 Mongo 时,一切都非常简单。 宣布 客户类别 连接和 基类 对于cruds,可以更轻松地对集合进行查询。

接下来会发生什么?

文章不是很长,因为这里我只讲动机和准备,所以不要怪我——我保证下一部分会有动作和图形。

因此,在下一部分中我们:

  1. 让我们在 aiohttp 上为 alphavantage 编写一个小型客户端,并请求我们需要的端点。
  2. 让我们创建一个代理来收集证券及其历史价格的数据。

项目代码

这部分的代码

来源: habr.com

添加评论