Πώς κατέληξα να ζω έτσι;
Πριν από λίγο καιρό έπρεπε να δουλέψω στο backend ενός έργου με μεγάλη φόρτωση, στο οποίο ήταν απαραίτητο να οργανωθεί η τακτική εκτέλεση ενός μεγάλου αριθμού εργασιών παρασκηνίου με περίπλοκους υπολογισμούς και αιτήματα για υπηρεσίες τρίτων. Το έργο είναι ασύγχρονο και πριν έρθω, είχε έναν απλό μηχανισμό για εργασίες εκκίνησης: έναν βρόχο που ελέγχει την τρέχουσα ώρα και εκκινεί ομάδες κορουτινών μέσω συλλογής - αυτή η προσέγγιση αποδείχτηκε αποδεκτή έως ότου υπήρξαν δεκάδες και εκατοντάδες τέτοιες κορουτίνες , ωστόσο, όταν ο αριθμός τους ξεπέρασε τις δύο χιλιάδες, έπρεπε να σκεφτώ να οργανώσω μια κανονική ουρά εργασιών με έναν μεσίτη, αρκετούς εργάτες κ.λπ.
Πρώτα αποφάσισα να δοκιμάσω το Celery, το οποίο είχα χρησιμοποιήσει πριν. Λόγω της ασύγχρονης φύσης του έργου, βούτηξα στην ερώτηση και είδα
Θα το πω αυτό, το έργο είναι πολύ ενδιαφέρον και λειτουργεί αρκετά επιτυχημένα σε άλλες εφαρμογές της ομάδας μας, και ο ίδιος ο συγγραφέας λέει ότι μπόρεσε να το κυκλοφορήσει στην παραγωγή χρησιμοποιώντας μια ασύγχρονη πισίνα. Αλλά, δυστυχώς, δεν μου ταίριαζε πραγματικά, όπως αποδείχθηκε
Από αυτή την άποψη, άρχισα να ψάχνω εναλλακτικές λύσεις και το βρήκε! Οι δημιουργοί του σέλινου, συγκεκριμένα, όπως το καταλαβαίνω
Επίσης, μπορείτε να κοιτάξετε
Τι κάνουμε?
Έτσι, σε μια σύντομη σειρά άρθρων, θα σας δείξω πώς να συλλέγετε δεδομένα από εργασίες παρασκηνίου χρησιμοποιώντας το Faust. Η πηγή για το παράδειγμα του έργου μας θα είναι, όπως υποδηλώνει το όνομα,
Υ.Γ. Κρίνοντας από τη σιγουριά με την οποία γράφτηκε το σημείο για την παρακολούθηση, νομίζω ότι ο αναγνώστης στο τέλος του τελευταίου άρθρου θα μοιάζει κάπως έτσι:
Απαιτήσεις έργου
Λόγω του γεγονότος ότι έχω ήδη υποσχεθεί, ας κάνουμε μια μικρή λίστα με το τι θα πρέπει να μπορεί να κάνει η υπηρεσία:
- Ανεβάστε τίτλους και μια επισκόπηση αυτών (συμπεριλαμβανομένων των κερδών και ζημιών, του ισολογισμού, των ταμειακών ροών - για το τελευταίο έτος) - τακτικά
- Ανεβάστε ιστορικά δεδομένα (για κάθε έτος διαπραγμάτευσης, βρείτε ακραίες τιμές της τιμής κλεισίματος των συναλλαγών) - τακτικά
- Ανεβάστε τα πιο πρόσφατα δεδομένα συναλλαγών - τακτικά
- Ανεβάστε μια προσαρμοσμένη λίστα δεικτών για κάθε ασφάλεια - τακτικά
Όπως ήταν αναμενόμενο, επιλέγουμε ένα όνομα για το έργο από το μπλε: horton
Ετοιμάζουμε τις υποδομές
Ο τίτλος είναι σίγουρα δυνατός, ωστόσο, το μόνο που χρειάζεται να κάνετε είναι να γράψετε ένα μικρό config για docker-compose με kafka (και zookeeper - σε ένα δοχείο), kafdrop (αν θέλουμε να δούμε μηνύματα σε θέματα), mongodb. Παίρνουμε [docker-compose.yml](
version: '3'
services:
db:
container_name: horton-mongodb-local
image: mongo:4.2-bionic
command: mongod --port 20017
restart: always
ports:
- 20017:20017
environment:
- MONGO_INITDB_DATABASE=horton
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin_password
kafka-service:
container_name: horton-kafka-local
image: obsidiandynamics/kafka
restart: always
ports:
- "2181:2181"
- "9092:9092"
environment:
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
KAFKA_RESTART_ATTEMPTS: "10"
KAFKA_RESTART_DELAY: "5"
ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"
kafdrop:
container_name: horton-kafdrop-local
image: 'obsidiandynamics/kafdrop:latest'
restart: always
ports:
- '9000:9000'
environment:
KAFKA_BROKERCONNECT: kafka-service:29092
depends_on:
- kafka-service
Δεν υπάρχει τίποτα περίπλοκο εδώ. Δηλώθηκαν δύο ακροατές για το kafka: ο ένας (εσωτερικός) για χρήση εντός του σύνθετου δικτύου και ο δεύτερος (εξωτερικός) για αιτήματα από το εξωτερικό, οπότε το διαβίβασαν εκτός. 2181 — λιμάνι zookeeper. Τα υπόλοιπα, νομίζω, είναι ξεκάθαρα.
Προετοιμασία του σκελετού του έργου
Στη βασική έκδοση, η δομή του έργου μας θα πρέπει να μοιάζει με αυτό:
horton
├── docker-compose.yml
└── horton
├── agents.py *
├── alphavantage.py *
├── app.py *
├── config.py
├── database
│ ├── connect.py
│ ├── cruds
│ │ ├── base.py
│ │ ├── __init__.py
│ │ └── security.py *
│ └── __init__.py
├── __init__.py
├── records.py *
└── tasks.py *
*Όλα όσα σημείωσα Δεν το αγγίζουμε ακόμα, απλώς δημιουργούμε άδεια αρχεία.**
Δημιουργήσαμε μια δομή. Τώρα ας προσθέσουμε τις απαραίτητες εξαρτήσεις, γράψουμε το config και συνδεθείτε στο mongodb. Δεν θα παράσχω το πλήρες κείμενο των αρχείων του άρθρου, για να μην το καθυστερήσω, αλλά θα παράσχω συνδέσμους προς τις απαραίτητες εκδόσεις.
Ας ξεκινήσουμε με εξαρτήσεις και meta σχετικά με το έργο -
Στη συνέχεια, ξεκινάμε την εγκατάσταση εξαρτήσεων και τη δημιουργία ενός virtualenv (ή μπορείτε να δημιουργήσετε μόνοι σας τον φάκελο venv και να ενεργοποιήσετε το περιβάλλον):
pip3 install poetry (если ещё не установлено)
poetry install
Τώρα ας δημιουργήσουμε
Όταν συνδέεστε στο Mongo, όλα είναι πολύ απλά. ανακοινώθηκε
Τι θα συμβεί στη συνέχεια;
Το άρθρο δεν είναι πολύ μεγάλο, καθώς εδώ μιλάω μόνο για κίνητρα και προετοιμασία, οπότε μην με κατηγορείτε - υπόσχομαι ότι το επόμενο μέρος θα έχει δράση και γραφικά.
Έτσι, σε αυτό ακριβώς το επόμενο μέρος:
- Ας γράψουμε έναν μικρό πελάτη για το alphavantage στο aiohttp με αιτήματα για τα τελικά σημεία που χρειαζόμαστε.
- Ας δημιουργήσουμε έναν πράκτορα που θα συλλέγει δεδομένα για τίτλους και ιστορικές τιμές για αυτούς.
Πηγή: www.habr.com