Εργασίες παρασκηνίου στο Faust, Μέρος II: Πράκτορες και ομάδες

Εργασίες παρασκηνίου στο Faust, Μέρος II: Πράκτορες και ομάδες

πίνακας περιεχομένων

  1. Μέρος Ι: Εισαγωγή

  2. Μέρος II: Πράκτορες και ομάδες

Τι κάνουμε εδώ;

Λοιπόν, λοιπόν, το δεύτερο μέρος. Όπως γράφτηκε νωρίτερα, σε αυτό θα κάνουμε τα εξής:

  1. Ας γράψουμε έναν μικρό πελάτη για το alphavantage στο aiohttp με αιτήματα για τα τελικά σημεία που χρειαζόμαστε.

  2. Ας δημιουργήσουμε έναν πράκτορα που θα συλλέγει δεδομένα για τίτλους και μετα-πληροφορίες για αυτούς.

Αλλά, αυτό θα κάνουμε για το ίδιο το έργο, και όσον αφορά την έρευνα του Faust, θα μάθουμε πώς να γράφουμε πράκτορες που επεξεργάζονται συμβάντα ροής από το kafka, καθώς και πώς να γράφουμε εντολές (click wrapper), στην περίπτωσή μας - για μη αυτόματα μηνύματα ώθησης στο θέμα που παρακολουθεί ο πράκτορας.

Εκπαίδευση

Πελάτης AlphaVantage

Αρχικά, ας γράψουμε έναν μικρό πελάτη aiohttp για αιτήματα στο alphavantage.

alphavantage.py

Φθείρων

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Στην πραγματικότητα, όλα είναι ξεκάθαρα από αυτό:

  1. Το AlphaVantage API είναι αρκετά απλά και όμορφα σχεδιασμένο, γι' αυτό αποφάσισα να κάνω όλα τα αιτήματα μέσω της μεθόδου construct_query όπου με τη σειρά του υπάρχει μια κλήση http.

  2. Φέρνω όλα τα χωράφια σε snake_case για άνεση.

  3. Λοιπόν, η διακόσμηση logger.catch για όμορφο και ενημερωτικό αποτέλεσμα traceback.

ΥΓ Μην ξεχάσετε να προσθέσετε το διακριτικό alphavantage τοπικά στο config.yml ή να εξαγάγετε τη μεταβλητή περιβάλλοντος HORTON_SERVICE_APIKEY. Λαμβάνουμε ένα κουπόνι εδώ.

Κατηγορία CRUD

Θα έχουμε μια συλλογή τίτλων για την αποθήκευση μετα-πληροφοριών σχετικά με τους τίτλους.

βάση δεδομένων/security.py

Κατά τη γνώμη μου, δεν χρειάζεται να εξηγήσω τίποτα εδώ, και η ίδια η βασική κλάση είναι αρκετά απλή.

λάβε την εφαρμογή()

Ας προσθέσουμε μια συνάρτηση για τη δημιουργία ενός αντικειμένου εφαρμογής app.py

Φθείρων

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Προς το παρόν θα έχουμε την πιο απλή δημιουργία εφαρμογής, λίγο αργότερα θα την επεκτείνουμε, ωστόσο, για να μην σας περιμένουμε, εδώ βιβλιογραφικές αναφορές στην κατηγορία εφαρμογών. Σας συμβουλεύω επίσης να ρίξετε μια ματιά στην κατηγορία ρυθμίσεων, καθώς αυτή είναι υπεύθυνη για τις περισσότερες ρυθμίσεις.

Κύριο σώμα

Πράκτορας συλλογής και τήρησης καταλόγου τίτλων

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Έτσι, πρώτα παίρνουμε το αντικείμενο εφαρμογής faust - είναι αρκετά απλό. Στη συνέχεια, δηλώνουμε ρητά ένα θέμα για τον αντιπρόσωπό μας... Εδώ αξίζει να αναφέρουμε τι είναι, ποια είναι η εσωτερική παράμετρος και πώς μπορεί να τακτοποιηθεί διαφορετικά.

  1. Θέματα στον κάφκα, αν θέλουμε να μάθουμε τον ακριβή ορισμό, καλύτερα να τα διαβάσουμε μακριά από. έγγραφο, ή μπορείτε να διαβάσετε επιτομή στο Habré στα ρωσικά, όπου όλα αντικατοπτρίζονται επίσης με μεγάλη ακρίβεια :)

  2. Εσωτερική παράμετρος, που περιγράφεται αρκετά καλά στο faust doc, μας επιτρέπει να διαμορφώσουμε το θέμα απευθείας στον κώδικα, φυσικά, αυτό σημαίνει τις παραμέτρους που παρέχονται από τους προγραμματιστές faust, για παράδειγμα: διατήρηση, πολιτική διατήρησης (από προεπιλογή διαγραφή, αλλά μπορείτε να ορίσετε συμπαγής), αριθμός κατατμήσεων ανά θέμα (βαθμολογίεςνα κάνει, για παράδειγμα, λιγότερο από παγκόσμια σημασία εφαρμογές faust).

  3. Γενικά, ο πράκτορας μπορεί να δημιουργήσει ένα διαχειριζόμενο θέμα με καθολικές τιμές, ωστόσο, μου αρέσει να δηλώνω τα πάντα ρητά. Επιπλέον, ορισμένες παράμετροι (για παράδειγμα, ο αριθμός των κατατμήσεων ή η πολιτική διατήρησης) του θέματος στη διαφήμιση αντιπροσώπου δεν μπορούν να διαμορφωθούν.

    Δείτε πώς μπορεί να φαίνεται χωρίς να ορίσετε με μη αυτόματο τρόπο το θέμα:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Λοιπόν, τώρα ας περιγράψουμε τι θα κάνει ο ατζέντης μας :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Έτσι, στην αρχή του agent, ανοίγουμε μια περίοδο λειτουργίας aiohttp για αιτήματα μέσω του πελάτη μας. Έτσι, κατά την εκκίνηση ενός εργαζόμενου, όταν εκκινηθεί ο αντιπρόσωπός μας, θα ανοίξει αμέσως μια περίοδος λειτουργίας - μία, για όλο το διάστημα που εκτελείται ο εργαζόμενος (ή πολλές, εάν αλλάξετε την παράμετρο συνοχή από έναν πράκτορα με προεπιλεγμένη μονάδα).

Στη συνέχεια, ακολουθούμε τη ροή (τοποθετούμε το μήνυμα _, αφού εμείς, σε αυτόν τον πράκτορα, δεν μας ενδιαφέρει το περιεχόμενο) των μηνυμάτων από το θέμα μας, εάν υπάρχουν στην τρέχουσα μετατόπιση, διαφορετικά ο κύκλος μας θα περιμένει την άφιξή τους. Λοιπόν, μέσα στο βρόχο μας, καταγράφουμε την παραλαβή του μηνύματος, λαμβάνουμε μια λίστα ενεργών τίτλων (το get_securities επιστρέφει μόνο ενεργά από προεπιλογή, βλ. κωδικό πελάτη) και το αποθηκεύουμε στη βάση δεδομένων, ελέγχοντας αν υπάρχει ασφάλεια με το ίδιο ticker και ανταλλαγή στη βάση δεδομένων , εάν υπάρχει, τότε (το χαρτί) απλώς θα ενημερωθεί.

Ας ξεκινήσουμε τη δημιουργία μας!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Χαρακτηριστικά συστατικό ιστού Δεν θα θεωρήσω το faust στα άρθρα, οπότε ορίσαμε την κατάλληλη σημαία.

Στην εντολή εκκίνησης, είπαμε στον Faust πού να αναζητήσει το αντικείμενο της εφαρμογής και τι να κάνει με αυτό (εκκίνηση ενός εργαζόμενου) με το επίπεδο εξόδου του αρχείου καταγραφής πληροφοριών. Παίρνουμε την ακόλουθη έξοδο:

Φθείρων

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Είναι ζωντανό!!!

Ας δούμε το σετ διαμερισμάτων. Όπως μπορούμε να δούμε, δημιουργήθηκε ένα θέμα με το όνομα που ορίσαμε στον κώδικα, τον προεπιλεγμένο αριθμό κατατμήσεων (8, από topic_partitions - παράμετρος αντικειμένου εφαρμογής), αφού δεν καθορίσαμε μεμονωμένη τιμή για το θέμα μας (μέσω κατατμήσεων). Στον εκκινημένο παράγοντα στον εργάτη εκχωρούνται και οι 8 κατατμήσεις, αφού είναι το μοναδικό, αλλά αυτό θα συζητηθεί με περισσότερες λεπτομέρειες στο μέρος για την ομαδοποίηση.

Λοιπόν, τώρα μπορούμε να πάμε σε άλλο παράθυρο τερματικού και να στείλουμε ένα κενό μήνυμα στο θέμα μας:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS χρησιμοποιώντας @ Δείχνουμε ότι στέλνουμε ένα μήνυμα σε ένα θέμα με το όνομα "collect_securities".

Σε αυτήν την περίπτωση, το μήνυμα πήγε στο διαμέρισμα 6 - μπορείτε να το ελέγξετε μεταβαίνοντας στο kafdrop on localhost:9000

Πηγαίνοντας στο παράθυρο του τερματικού με τον εργαζόμενό μας, θα δούμε ένα χαρούμενο μήνυμα που αποστέλλεται χρησιμοποιώντας το loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Μπορούμε επίσης να εξετάσουμε το mongo (χρησιμοποιώντας Robo3T ή Studio3T) και να δούμε ότι οι τίτλοι βρίσκονται στη βάση δεδομένων:

Δεν είμαι δισεκατομμυριούχος και επομένως είμαστε ικανοποιημένοι με την πρώτη επιλογή προβολής.

Εργασίες παρασκηνίου στο Faust, Μέρος II: Πράκτορες και ομάδεςΕργασίες παρασκηνίου στο Faust, Μέρος II: Πράκτορες και ομάδες

Ευτυχία και χαρά - ο πρώτος πράκτορας είναι έτοιμος :)

Έτοιμος ο πράκτορας, ζήτω ο νέος πράκτορας!

Ναι, κύριοι, έχουμε καλύψει μόνο το 1/3 της διαδρομής που ετοιμάζει αυτό το άρθρο, αλλά μην αποθαρρύνεστε, γιατί τώρα θα είναι πιο εύκολο.

Τώρα λοιπόν χρειαζόμαστε έναν πράκτορα που συλλέγει μετα-πληροφορίες και τις τοποθετεί σε ένα έγγραφο συλλογής:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Εφόσον αυτός ο πράκτορας θα επεξεργαστεί πληροφορίες σχετικά με μια συγκεκριμένη ασφάλεια, πρέπει να υποδείξουμε το σύμβολο (σύμβολο) αυτής της ασφάλειας στο μήνυμα. Για το σκοπό αυτό στο faust υπάρχουν Εγγραφές — κλάσεις που δηλώνουν το σχήμα μηνυμάτων στο θέμα του πράκτορα.

Σε αυτή την περίπτωση, ας πάμε στο records.py και περιγράψτε πώς θα πρέπει να είναι το μήνυμα για αυτό το θέμα:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Όπως ίσως μαντέψατε, ο Faust χρησιμοποιεί τον σχολιασμό τύπου python για να περιγράψει το σχήμα μηνυμάτων, γι' αυτό η ελάχιστη έκδοση που υποστηρίζεται από τη βιβλιοθήκη είναι 3.6.

Ας επιστρέψουμε στον πράκτορα, ορίζουμε τους τύπους και τον προσθέτουμε:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Όπως μπορείτε να δείτε, περνάμε μια νέα παράμετρο με ένα σχήμα στη μέθοδο αρχικοποίησης θέματος - value_type. Επιπλέον, όλα ακολουθούν το ίδιο σχέδιο, επομένως δεν βλέπω κανένα νόημα να σταθώ σε κάτι άλλο.

Λοιπόν, η τελευταία πινελιά είναι να προσθέσετε μια κλήση στον πράκτορα συλλογής μετα-πληροφοριών για collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Χρησιμοποιούμε το σχήμα που ανακοινώθηκε προηγουμένως για το μήνυμα. Σε αυτή την περίπτωση, χρησιμοποίησα τη μέθοδο .cast μιας και δεν χρειάζεται να περιμένουμε το αποτέλεσμα από τον πράκτορα, αλλά αξίζει να αναφέρουμε ότι τρόπους στείλτε μήνυμα στο θέμα:

  1. cast - δεν μπλοκάρει γιατί δεν περιμένει αποτέλεσμα. Δεν μπορείτε να στείλετε το αποτέλεσμα σε άλλο θέμα ως μήνυμα.

  2. αποστολή - δεν μπλοκάρει γιατί δεν περιμένει αποτέλεσμα. Μπορείτε να καθορίσετε έναν πράκτορα στο θέμα στο οποίο θα πάει το αποτέλεσμα.

  3. ρωτήστε - περιμένει ένα αποτέλεσμα. Μπορείτε να καθορίσετε έναν πράκτορα στο θέμα στο οποίο θα πάει το αποτέλεσμα.

Λοιπόν, όλα αυτά με τους πράκτορες για σήμερα!

Η ονειρική ομάδα

Το τελευταίο πράγμα που υποσχέθηκα να γράψω σε αυτό το μέρος είναι οι εντολές. Όπως αναφέρθηκε προηγουμένως, οι εντολές στο faust είναι ένα περιτύλιγμα γύρω από το κλικ. Στην πραγματικότητα, ο faust απλώς επισυνάπτει την προσαρμοσμένη εντολή μας στη διεπαφή του όταν καθορίζει το κλειδί -A

Μετά τους ανακοινωθέντες πράκτορες πράκτορες.py προσθέστε μια λειτουργία με ένα διακοσμητή app.commandκαλώντας τη μέθοδο ρίχνει у συλλέγω_τίτλους:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Έτσι, εάν καλέσουμε τη λίστα εντολών, η νέα μας εντολή θα είναι σε αυτήν:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Μπορούμε να το χρησιμοποιήσουμε όπως οποιοσδήποτε άλλος, οπότε ας επανεκκινήσουμε το faust worker και ας ξεκινήσουμε μια πλήρη συλλογή τίτλων:

> faust -A horton.agents start-collect-securities

Τι θα συμβεί στη συνέχεια;

Στο επόμενο μέρος, χρησιμοποιώντας τους εναπομείναντες πράκτορες ως παράδειγμα, θα εξετάσουμε τον μηχανισμό απορρόφησης για την αναζήτηση ακραίων τιμών στις τιμές κλεισίματος των συναλλαγών για το έτος και το λανσάρισμα των πρακτόρων.

Αυτά για σήμερα! Ευχαριστώ για την ανάγνωση :)

Κωδικός για αυτό το μέρος

Εργασίες παρασκηνίου στο Faust, Μέρος II: Πράκτορες και ομάδες

ΥΓ Κάτω από το τελευταίο μέρος με ρώτησαν για το faust και το confluent kafka (τι χαρακτηριστικά έχει το confluent;). Φαίνεται ότι το confluent είναι πιο λειτουργικό με πολλούς τρόπους, αλλά το γεγονός είναι ότι το faust δεν έχει πλήρη υποστήριξη πελατών για το confluent - αυτό προκύπτει από περιγραφές περιορισμών πελατών στο έγγρ.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο