Làm sao tôi lại có thể sống như thế này?
Cách đây không lâu, tôi phải làm việc ở phần phụ trợ của một dự án có tải trọng cao, trong đó cần phải tổ chức thực hiện thường xuyên một số lượng lớn các tác vụ nền với các tính toán và yêu cầu phức tạp đối với các dịch vụ của bên thứ ba. Dự án này không đồng bộ và trước khi tôi đến, nó có một cơ chế đơn giản cho các tác vụ khởi chạy cron: một vòng lặp kiểm tra thời gian hiện tại và khởi chạy các nhóm coroutine thông qua thu thập - cách tiếp cận này hóa ra có thể chấp nhận được cho đến khi có hàng chục, hàng trăm coroutine như vậy Tuy nhiên, khi số lượng của họ vượt quá hai nghìn, tôi phải nghĩ đến việc tổ chức một hàng nhiệm vụ bình thường với một người môi giới, một số công nhân, v.v.
Đầu tiên tôi quyết định dùng thử Celery, loại mà tôi đã từng sử dụng trước đây. Do tính chất không đồng bộ của dự án, tôi đã đi sâu vào câu hỏi và thấy
Tôi sẽ nói điều này, dự án rất thú vị và hoạt động khá thành công trong các ứng dụng khác của nhóm chúng tôi và bản thân tác giả nói rằng anh ấy có thể đưa nó vào sản xuất bằng cách sử dụng một nhóm không đồng bộ. Nhưng thật không may, hóa ra nó lại không thực sự phù hợp với tôi
Về vấn đề này, tôi bắt đầu tìm kiếm lựa chọn thay thế và tìm thấy nó! Cụ thể là những người tạo ra cần tây, theo tôi hiểu
Ngoài ra, bạn có thể nhìn
Chúng ta làm gì?
Vì vậy, trong loạt bài viết ngắn, tôi sẽ hướng dẫn bạn cách thu thập dữ liệu từ các tác vụ nền bằng Faust. Nguồn cho dự án ví dụ của chúng tôi sẽ như tên cho thấy,
Tái bút Đánh giá về sự tự tin mà quan điểm về giám sát được viết ra, tôi nghĩ rằng người đọc ở cuối bài viết trước vẫn sẽ trông giống như thế này:
Yêu cầu dự án
Vì tôi đã hứa nên hãy lập một danh sách nhỏ những gì dịch vụ có thể làm:
- Tải lên chứng khoán và thông tin tổng quan về chúng (bao gồm lãi và lỗ, bảng cân đối kế toán, dòng tiền - trong năm qua) - thường xuyên
- Tải lên dữ liệu lịch sử (đối với mỗi năm giao dịch, tìm giá trị cực trị của giá đóng cửa giao dịch) - thường xuyên
- Tải lên dữ liệu giao dịch mới nhất - thường xuyên
- Tải lên danh sách chỉ báo tùy chỉnh cho từng loại chứng khoán - thường xuyên
Đúng như mong đợi, chúng tôi chọn tên cho dự án từ đầu: Horton
Chúng tôi đang chuẩn bị cơ sở hạ tầng
Tiêu đề chắc chắn rất mạnh, tuy nhiên, tất cả những gì bạn cần làm là viết một cấu hình nhỏ cho docker-compose với kafka (và người quản lý vườn thú - trong một vùng chứa), kafdrop (nếu chúng ta muốn xem tin nhắn trong chủ đề), mongodb. Chúng tôi nhận được [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Không có gì phức tạp ở đây cả. Hai trình nghe đã được khai báo cho kafka: một (nội bộ) để sử dụng bên trong mạng tổng hợp và trình nghe thứ hai (bên ngoài) cho các yêu cầu từ bên ngoài, vì vậy họ đã chuyển tiếp nó ra bên ngoài. 2181 — cảng vườn thú. Phần còn lại, tôi nghĩ, là rõ ràng.
Chuẩn bị bộ xương của dự án
Trong phiên bản cơ bản, cấu trúc dự án của chúng ta sẽ như thế này:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Mọi thứ tôi ghi nhận Chúng tôi chưa chạm vào nó, chúng tôi chỉ tạo các tệp trống.**
Chúng tôi đã tạo ra một cấu trúc. Bây giờ hãy thêm các phụ thuộc cần thiết, viết cấu hình và kết nối với mongodb. Tôi sẽ không cung cấp toàn bộ nội dung của các tệp trong bài viết để không làm chậm trễ nó nhưng tôi sẽ cung cấp liên kết đến các phiên bản cần thiết.
Hãy bắt đầu với phần phụ thuộc và meta về dự án -
Tiếp theo, chúng tôi bắt đầu cài đặt các phần phụ thuộc và tạo virtualenv (hoặc bạn có thể tự tạo thư mục venv và kích hoạt môi trường):
pip3 install poetry (если ещё не установлено)
poetry install
Bây giờ hãy tạo
Khi kết nối với Mongo, mọi thứ khá đơn giản. công bố
Chuyện gì sẽ xảy ra tiếp theo?
Bài viết không dài lắm, ở đây mình chỉ nói về động lực và sự chuẩn bị thôi nên đừng trách mình - mình hứa phần sau sẽ có hành động và đồ họa.
Vì vậy, trong phần tiếp theo này, chúng tôi:
- Hãy viết một ứng dụng khách nhỏ cho alphavantage trên aiohttp với các yêu cầu về điểm cuối mà chúng ta cần.
- Hãy tạo một đại lý sẽ thu thập dữ liệu về chứng khoán và giá lịch sử cho họ.
Nguồn: www.habr.com