Attività di background su Faust, parte I: un'introduzione

Attività di background su Faust, parte I: un'introduzione

Come sono arrivato a vivere così?

Non molto tempo fa ho dovuto lavorare sul backend di un progetto molto carico, in cui era necessario organizzare l'esecuzione regolare di un gran numero di attività in background con calcoli complessi e richieste di servizi di terze parti. Il progetto è asincrono e prima del mio arrivo aveva un semplice meccanismo per le attività di avvio del cron: un ciclo che controllava l'ora corrente e lanciava gruppi di coroutine tramite raccolta - questo approccio si è rivelato accettabile finché non sono arrivate dozzine e centinaia di coroutine di questo tipo , tuttavia, quando il loro numero ha superato i duemila, ho dovuto pensare a organizzare una normale coda di attività con un broker, diversi lavoratori e così via.

Per prima cosa ho deciso di provare Celery, che avevo usato prima. A causa della natura asincrona del progetto, mi sono tuffato nella domanda e ho visto ArticoloOltre progetto, creato dall'autore dell'articolo.

Dirò questo, il progetto è molto interessante e funziona con successo in altre applicazioni del nostro team, e l'autore stesso afferma di essere riuscito a metterlo in produzione utilizzando un pool asincrono. Ma, sfortunatamente, non mi andava bene, come si è scoperto problema con avvio di gruppo di compiti (vedi. gruppo). Al momento in cui scrivo problema è già chiuso però i lavori vanno avanti da un mese. In ogni caso, buona fortuna all'autore e tutto il meglio, visto che ci sono già cose che funzionano sulla lib... in generale, il punto è in me e lo strumento si è rivelato umido per me. Inoltre, alcune attività avevano 2-3 richieste http a servizi diversi, quindi anche durante l'ottimizzazione delle attività creiamo 4mila connessioni TCP, circa ogni 2 ore - non molto buono... Vorrei creare una sessione per un tipo di compito quando si avviano i lavoratori. Qualcosa in più sul gran numero di richieste via aiohttp qui.

A questo proposito, ho iniziato a cercare альтернативы e l'ho trovato! I creatori del sedano, in particolare, a quanto ho capito Chiedi a Solem, è stata creata Faust, originariamente per il progetto Robin Hood. Faust si ispira a Kafka Streams e lavora con Kafka come broker, Rocksdb viene utilizzato anche per archiviare i risultati del lavoro degli agenti e, cosa più importante, la libreria è asincrona.

Inoltre, puoi guardare confronto veloce sedano e faust dai creatori di quest'ultimo: le loro differenze, differenze tra broker, implementazione di un compito elementare. Tutto è abbastanza semplice, tuttavia, una caratteristica interessante di Faust attira l'attenzione: i dati digitati per la trasmissione all'argomento.

Cosa faremo

Quindi, in una breve serie di articoli, ti mostrerò come raccogliere dati da attività in background utilizzando Faust. La fonte per il nostro progetto di esempio sarà, come suggerisce il nome, alfavantage.co. Dimostrerò come scrivere agenti (sink, argomenti, partizioni), come eseguire l'esecuzione regolare (cron), i comandi faust cli più convenienti (un wrapper su clic), il semplice clustering e alla fine allegheremo un datadog ( lavorando fuori dagli schemi) e provare a vedere qualcosa. Per memorizzare i dati raccolti utilizzeremo mongodb e motore per la connessione.

PS A giudicare dalla sicurezza con cui è stato scritto il punto sul monitoraggio, penso che il lettore alla fine dell'ultimo articolo apparirà ancora più o meno così:

Attività di background su Faust, parte I: un'introduzione

Requisiti del progetto

Visto che l'ho già promesso, facciamo un piccolo elenco di cosa dovrebbe essere in grado di fare il servizio:

  1. Carica regolarmente i titoli e una loro panoramica (inclusi profitti e perdite, stato patrimoniale, flusso di cassa - per l'ultimo anno).
  2. Carica i dati storici (per ogni anno di negoziazione, trova i valori estremi del prezzo di chiusura delle negoziazioni) - regolarmente
  3. Carica gli ultimi dati di trading - regolarmente
  4. Carica regolarmente un elenco personalizzato di indicatori per ciascun titolo

Come previsto, scegliamo da zero un nome per il progetto: Horton

Stiamo preparando le infrastrutture

Il titolo è sicuramente forte, tuttavia, tutto ciò che devi fare è scrivere una piccola configurazione per docker-compose con kafka (e zookeeper - in un contenitore), kafdrop (se vogliamo guardare i messaggi negli argomenti), mongodb. Noi abbiamo [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) della seguente forma:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Non c'è niente di complicato qui. Per Kafka sono stati dichiarati due ascoltatori: uno (interno) da utilizzare all'interno della rete composita e il secondo (esterno) per le richieste dall'esterno, quindi lo hanno inoltrato all'esterno. 2181: porto dei guardiani dello zoo. Il resto, credo, è chiaro.

Preparazione dello scheletro del progetto

Nella versione base, la struttura del nostro progetto dovrebbe assomigliare a questa:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Tutto quello che ho notato Non lo tocchiamo ancora, creiamo solo file vuoti.**

Abbiamo creato una struttura. Ora aggiungiamo le dipendenze necessarie, scriviamo la configurazione e connettiamoci a mongodb. Non fornirò il testo completo dei file nell'articolo, per non ritardarlo, ma fornirò i collegamenti alle versioni necessarie.

Iniziamo con le dipendenze e i metadati del progetto: pyproject.toml

Successivamente, iniziamo a installare le dipendenze e a creare un virtualenv (oppure puoi creare tu stesso la cartella venv e attivare l'ambiente):

pip3 install poetry (если ещё не установлено)
poetry install

Ora creiamo config.yml - Credenziali e dove bussare. Puoi inserire immediatamente i dati per alphavantage lì. Bene, passiamo a config.py — estrai i dati per l'applicazione dalla nostra configurazione. Sì, lo confesso, ho usato la mia lib... sitri.

Quando ci si connette a Mongo, tutto è abbastanza semplice. annunciato classe cliente per connettersi e classe base per cruds, per rendere più semplice l'esecuzione di query sulle collezioni.

Che cosa succederà dopo?

L'articolo non è molto lungo, dato che qui parlo solo di motivazione e preparazione, quindi non biasimarmi: prometto che la parte successiva avrà azione e grafica.

Quindi, in questa parte successiva:

  1. Scriviamo un piccolo client per alphavantage su aiohttp con le richieste per gli endpoint di cui abbiamo bisogno.
  2. Creiamo un agente che raccoglierà dati sui titoli e sui prezzi storici per essi.

Codice del progetto

Codice per questa parte

Fonte: habr.com

Aggiungi un commento