Nhiệm vụ cơ bản về Faust, Phần I: Giới thiệu

Nhiệm vụ cơ bản về Faust, Phần I: Giới thiệu

Làm sao tôi lại có thể sống như thế này?

Cách đây không lâu, tôi phải làm việc ở phần phụ trợ của một dự án có tải trọng cao, trong đó cần phải tổ chức thực hiện thường xuyên một số lượng lớn các tác vụ nền với các tính toán và yêu cầu phức tạp đối với các dịch vụ của bên thứ ba. Dự án này không đồng bộ và trước khi tôi đến, nó có một cơ chế đơn giản cho các tác vụ khởi chạy cron: một vòng lặp kiểm tra thời gian hiện tại và khởi chạy các nhóm coroutine thông qua thu thập - cách tiếp cận này hóa ra có thể chấp nhận được cho đến khi có hàng chục, hàng trăm coroutine như vậy Tuy nhiên, khi số lượng của họ vượt quá hai nghìn, tôi phải nghĩ đến việc tổ chức một hàng nhiệm vụ bình thường với một người môi giới, một số công nhân, v.v.

Đầu tiên tôi quyết định dùng thử Celery, loại mà tôi đã từng sử dụng trước đây. Do tính chất không đồng bộ của dự án, tôi đã đi sâu vào câu hỏi và thấy Bài viếtCũng như dự án, do tác giả bài viết tạo ra.

Tôi sẽ nói điều này, dự án rất thú vị và hoạt động khá thành công trong các ứng dụng khác của nhóm chúng tôi và bản thân tác giả nói rằng anh ấy có thể đưa nó vào sản xuất bằng cách sử dụng một nhóm không đồng bộ. Nhưng thật không may, hóa ra nó lại không thực sự phù hợp với tôi vấn đề với việc khởi động nhiệm vụ theo nhóm (xem. nhóm). Tại thời điểm viết bài vấn đề đã đóng cửa, tuy nhiên, công việc đã diễn ra được một tháng. Trong mọi trường hợp, xin chúc tác giả may mắn và mọi điều tốt đẹp nhất, vì đã có những thứ đang hoạt động trên lib... nói chung, vấn đề là ở tôi và công cụ này hóa ra lại rất ẩm ướt đối với tôi. Ngoài ra, một số tác vụ có 2-3 yêu cầu http tới các dịch vụ khác nhau, vì vậy ngay cả khi tối ưu hóa tác vụ, chúng tôi vẫn tạo ra 4 nghìn kết nối TCP, khoảng 2 giờ một lần - không tốt lắm... Tôi muốn tạo một phiên cho một loại nhiệm vụ khi bắt đầu công nhân. Nói thêm một chút về số lượng lớn yêu cầu qua aiohttp đây.

Về vấn đề này, tôi bắt đầu tìm kiếm lựa chọn thay thế và tìm thấy nó! Cụ thể là những người tạo ra cần tây, theo tôi hiểu Hỏi trang trọng, đã được tạo ra Faust, ban đầu cho dự án robinhood. Faust lấy cảm hứng từ Kafka Streams và làm việc với Kafka với tư cách là nhà môi giới,rocksdb còn được dùng để lưu trữ kết quả từ công việc của các đại lý và điều quan trọng nhất là thư viện không đồng bộ.

Ngoài ra, bạn có thể nhìn so sánh nhanh cần tây và faust từ những người tạo ra cái sau: sự khác biệt của họ, sự khác biệt giữa các nhà môi giới, việc thực hiện một nhiệm vụ cơ bản. Mọi thứ khá đơn giản, tuy nhiên, một tính năng hay trong faust thu hút sự chú ý - gõ dữ liệu để truyền đến chủ đề.

Chúng ta làm gì?

Vì vậy, trong loạt bài viết ngắn, tôi sẽ hướng dẫn bạn cách thu thập dữ liệu từ các tác vụ nền bằng Faust. Nguồn cho dự án ví dụ của chúng tôi sẽ như tên cho thấy, alphavantage.co. Tôi sẽ trình bày cách viết các tác nhân (phần chìm, chủ đề, phân vùng), cách thực hiện (cron) thông thường, các lệnh faust cli tiện lợi nhất (trình bao bọc khi nhấp chuột), phân cụm đơn giản và cuối cùng chúng ta sẽ đính kèm một datadog ( làm việc vượt trội) và thử một cái gì đó để xem. Để lưu trữ dữ liệu đã thu thập, chúng tôi sẽ sử dụng mongodb và motor để kết nối.

Tái bút Đánh giá về sự tự tin mà quan điểm về giám sát được viết ra, tôi nghĩ rằng người đọc ở cuối bài viết trước vẫn sẽ trông giống như thế này:

Nhiệm vụ cơ bản về Faust, Phần I: Giới thiệu

Yêu cầu dự án

Vì tôi đã hứa nên hãy lập một danh sách nhỏ những gì dịch vụ có thể làm:

  1. Tải lên chứng khoán và thông tin tổng quan về chúng (bao gồm lãi và lỗ, bảng cân đối kế toán, dòng tiền - trong năm qua) - thường xuyên
  2. Tải lên dữ liệu lịch sử (đối với mỗi năm giao dịch, tìm giá trị cực trị của giá đóng cửa giao dịch) - thường xuyên
  3. Tải lên dữ liệu giao dịch mới nhất - thường xuyên
  4. Tải lên danh sách chỉ báo tùy chỉnh cho từng loại chứng khoán - thường xuyên

Đúng như mong đợi, chúng tôi chọn tên cho dự án từ đầu: Horton

Chúng tôi đang chuẩn bị cơ sở hạ tầng

Tiêu đề chắc chắn rất mạnh, tuy nhiên, tất cả những gì bạn cần làm là viết một cấu hình nhỏ cho docker-compose với kafka (và người quản lý vườn thú - trong một vùng chứa), kafdrop (nếu chúng ta muốn xem tin nhắn trong chủ đề), mongodb. Chúng tôi nhận được [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) có dạng sau:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Không có gì phức tạp ở đây cả. Hai trình nghe đã được khai báo cho kafka: một (nội bộ) để sử dụng bên trong mạng tổng hợp và trình nghe thứ hai (bên ngoài) cho các yêu cầu từ bên ngoài, vì vậy họ đã chuyển tiếp nó ra bên ngoài. 2181 — cảng vườn thú. Phần còn lại, tôi nghĩ, là rõ ràng.

Chuẩn bị bộ xương của dự án

Trong phiên bản cơ bản, cấu trúc dự án của chúng ta sẽ như thế này:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Mọi thứ tôi ghi nhận Chúng tôi chưa chạm vào nó, chúng tôi chỉ tạo các tệp trống.**

Chúng tôi đã tạo ra một cấu trúc. Bây giờ hãy thêm các phụ thuộc cần thiết, viết cấu hình và kết nối với mongodb. Tôi sẽ không cung cấp toàn bộ nội dung của các tệp trong bài viết để không làm chậm trễ nó nhưng tôi sẽ cung cấp liên kết đến các phiên bản cần thiết.

Hãy bắt đầu với phần phụ thuộc và meta về dự án - pyproject.toml

Tiếp theo, chúng tôi bắt đầu cài đặt các phần phụ thuộc và tạo virtualenv (hoặc bạn có thể tự tạo thư mục venv và kích hoạt môi trường):

pip3 install poetry (если ещё не установлено)
poetry install

Bây giờ hãy tạo config.yml - Thông tin xác thực và nơi để gõ. Bạn có thể ngay lập tức đặt dữ liệu cho alphavantage ở đó. Vâng, chúng ta hãy chuyển sang config.py — trích xuất dữ liệu cho ứng dụng từ config. Vâng, tôi thú nhận, tôi đã sử dụng lib của mình - sitri.

Khi kết nối với Mongo, mọi thứ khá đơn giản. công bố lớp khách hàng để kết nối và lớp cơ sở đối với cruds, để giúp thực hiện truy vấn trên các bộ sưu tập dễ dàng hơn.

Chuyện gì sẽ xảy ra tiếp theo?

Bài viết không dài lắm, ở đây mình chỉ nói về động lực và sự chuẩn bị thôi nên đừng trách mình - mình hứa phần sau sẽ có hành động và đồ họa.

Vì vậy, trong phần tiếp theo này, chúng tôi:

  1. Hãy viết một ứng dụng khách nhỏ cho alphavantage trên aiohttp với các yêu cầu về điểm cuối mà chúng ta cần.
  2. Hãy tạo một đại lý sẽ thu thập dữ liệu về chứng khoán và giá lịch sử cho họ.

Mã số dự án

Mã cho phần này

Nguồn: www.habr.com

Thêm một lời nhận xét