Cum am ajuns să trăiesc așa?
Nu cu mult timp în urmă a trebuit să lucrez la backend-ul unui proiect foarte încărcat, în care a fost necesar să organizez execuția regulată a unui număr mare de sarcini de fundal cu calcule complexe și solicitări pentru servicii terțe. Proiectul este asincron și înainte de a veni eu, avea un mecanism simplu pentru sarcinile de lansare cron: o buclă care verifică ora curentă și lansează grupuri de coroutine prin gather - această abordare s-a dovedit a fi acceptabilă până când au existat zeci și sute de astfel de corutine , însă, când numărul lor a depășit două mii, a trebuit să mă gândesc să organizez o coadă normală de sarcini cu un broker, mai mulți muncitori și așa mai departe.
Mai întâi am decis să încerc țelina, pe care o mai folosisem. Datorită naturii asincrone a proiectului, m-am scufundat în întrebare și am văzut
Voi spune asta, proiectul este foarte interesant și funcționează cu succes în alte aplicații ale echipei noastre, iar autorul însuși spune că a reușit să-l lanseze în producție folosind un pool asincron. Dar, din păcate, nu mi s-a potrivit cu adevărat, după cum sa dovedit
În acest sens, am început să caut alternative și l-a găsit! Creatorii de țelină, mai exact, așa cum am înțeles-o
De asemenea, poți să te uiți
Ce facem?
Deci, într-o serie scurtă de articole, vă voi arăta cum să colectați date din sarcinile de fundal folosind Faust. Sursa pentru proiectul nostru exemplu va fi, după cum sugerează și numele,
PS Judecând după încrederea cu care a fost scris punctul despre monitorizare, cred că cititorul de la sfârșitul ultimului articol va arăta în continuare cam așa:
Cerințele proiectului
Datorită faptului că am promis deja, să facem o mică listă cu ceea ce ar trebui să poată face serviciul:
- Încărcați titluri de valoare și o prezentare generală a acestora (inclusiv profituri și pierderi, bilanţ, flux de numerar - pentru ultimul an) - în mod regulat
- Încărcați date istorice (pentru fiecare an de tranzacționare, găsiți valori extreme ale prețului de închidere al tranzacționării) - în mod regulat
- Încărcați cele mai recente date de tranzacționare - în mod regulat
- Încărcați o listă personalizată de indicatori pentru fiecare securitate - în mod regulat
După cum era de așteptat, alegem un nume pentru proiect de la zero: Horton
Pregătim infrastructura
Titlul este cu siguranță puternic, totuși, tot ce trebuie să faceți este să scrieți o mică configurație pentru docker-compose cu kafka (și zookeeper - într-un singur container), kafdrop (dacă vrem să ne uităm la mesaje în subiecte), mongodb. Primim [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Nu este nimic complicat aici. Doi ascultători au fost declarați pentru kafka: unul (intern) pentru utilizare în interiorul rețelei compozite, iar al doilea (extern) pentru solicitări din exterior, așa că l-au transmis în exterior. 2181 — port grădina zoologică. Restul, cred, este clar.
Pregătirea scheletului proiectului
În versiunea de bază, structura proiectului nostru ar trebui să arate astfel:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Tot ce am notat Încă nu îl atingem, doar creăm fișiere goale.**
Am creat o structură. Acum să adăugăm dependențele necesare, să scriem configurația și să ne conectăm la mongodb. Nu voi furniza textul integral al fișierelor din articol, pentru a nu-l întârzia, dar voi oferi link-uri către versiunile necesare.
Să începem cu dependențe și meta despre proiect -
În continuare, începem să instalăm dependențe și să creăm un virtualenv (sau puteți crea singur folderul venv și activați mediul):
pip3 install poetry (если ещё не установлено)
poetry install
Acum să creăm
Când vă conectați la Mongo, totul este destul de simplu. a anunţat
Ce se va întâmpla în continuare?
Articolul nu este foarte lung, deoarece aici vorbesc doar despre motivație și pregătire, așa că nu mă învinovăți - promit că următoarea parte va avea acțiune și grafică.
Deci, în această parte următoare:
- Să scriem un client mic pentru alphaavantage pe aiohttp cu solicitări pentru punctele finale de care avem nevoie.
- Să creăm un agent care va colecta date despre valorile mobiliare și prețurile istorice pentru acestea.
Sursa: www.habr.com