我怎么就过这样的生活呢?
不久前,我要从事一个高负载项目的后端工作,其中需要组织大量后台任务的定期执行,这些任务具有复杂的计算和对第三方服务的请求。 该项目是异步的,在我来之前,它有一个用于 cron 启动任务的简单机制:一个循环检查当前时间并通过收集启动协程组 - 这种方法被证明是可以接受的,直到有数十个和数百个这样的协程然而,当他们的数量超过两千时,我不得不考虑组织一个正常的任务队列,有一个broker,几个worker等等。
首先,我决定尝试一下我以前用过的 Celery。 由于该项目的异步性质,我深入研究了这个问题并看到
我想说的是,这个项目非常有趣,并且在我们团队的其他应用程序中运行得相当成功,而且作者本人表示,他能够通过使用异步池将其投入生产。 但不幸的是,事实证明它并不适合我
对此,我开始寻找 备择方案 并找到了! 芹菜的创造者,具体来说,据我所知
另外,你还可以看看
我们要做什么?
因此,在一系列简短的文章中,我将向您展示如何使用 Faust 从后台任务收集数据。 顾名思义,我们示例项目的源代码是:
PS 从关于监控的观点写的信心来看,我认为上一篇文章结尾处的读者仍然会是这样的:
项目要求
由于我已经承诺过,让我们列出一个服务应该能够执行的操作的小清单:
- 定期上传证券及其概览(包括去年的损益表、资产负债表、现金流量)
- 上传历史数据(针对每个交易年度,查找交易收盘价的极值)-定期
- 定期上传最新交易数据
- 定期上传每个证券的定制指标列表
正如预期的那样,我们从头开始为项目选择一个名称: 霍顿
我们正在准备基础设施
这个标题确实很强大,但是,您需要做的就是为 docker-compose 编写一个小配置,其中包含 kafka(和 Zookeeper - 在一个容器中)、kafdrop(如果我们想查看主题中的消息)、mongodb。 我们得到 [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
这里根本没有什么复杂的。 为 kafka 声明了两个监听器:一个(内部)用于复合网络内部,第二个(外部)用于来自外部的请求,因此它们将其转发到外部。 2181 — 动物园管理员端口。 我想,剩下的事情就很清楚了。
准备项目的框架
在基本版本中,我们项目的结构应该如下所示:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*我注意到的一切 我们还没有触及它,我们只是创建空文件。**
我们创建了一个结构。 现在让我们添加必要的依赖项,编写配置并连接到 mongodb。 我不会提供文章中文件的全文,以免耽误时间,但我会提供必要版本的链接。
让我们从项目的依赖项和元数据开始 -
接下来,我们开始安装依赖项并创建 virtualenv (或者您可以自己创建 venv 文件夹并激活环境):
pip3 install poetry (если ещё не установлено)
poetry install
现在让我们创建
连接到 Mongo 时,一切都非常简单。 宣布
接下来会发生什么?
文章不是很长,因为这里我只讲动机和准备,所以不要怪我——我保证下一部分会有动作和图形。
因此,在下一部分中我们:
- 让我们在 aiohttp 上为 alphavantage 编写一个小型客户端,并请求我们需要的端点。
- 让我们创建一个代理来收集证券及其历史价格的数据。
来源: habr.com