Hannergrond Aufgaben op Faust, Deel II: Agenten an Teams

Hannergrond Aufgaben op Faust, Deel II: Agenten an Teams

Inhaltsverzeechnes

  1. Deel I: Aféierung

  2. Deel II: Agenten an Teams

Wat maache mir hei?

Also, also, den zweeten Deel. Wéi virdru geschriwwen, wäerte mir déi folgend maachen:

  1. Loosst eis e klenge Client fir Alphavantage op aiohttp schreiwen mat Ufroe fir d'Endpunkte déi mir brauchen.

  2. Loosst eis en Agent erstellen deen Daten iwwer Wäertpabeieren a Meta-Informatioune sammelt.

Awer, dat ass wat mir fir de Projet selwer maachen, a wat d'Faustfuerschung ugeet, léiere mir wéi Agenten schreiwen déi Stream-Evenementer vu Kafka veraarbechten, wéi och Kommandoen (Klick-Wrapper) schreiwen, an eisem Fall - fir manuell Push Messagen zum Thema dat den Agent iwwerwaacht.

Virbereedung

AlphaVantage Client

Als éischt schreiwen mer e klenge aiohttp Client fir Ufroe fir Alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eigentlech ass alles kloer dovunner:

  1. D'AlphaVantage API ass ganz einfach a schéin entworf, also hunn ech beschloss all Ufroen duerch d'Method ze maachen construct_query wou am Tour gëtt et en http Uruff.

  2. Ech bréngen all Felder op snake_case fir Komfort.

  3. Gutt, de Logger.catch Dekoratioun fir schéin an informativ Traceback Output.

PS Vergiesst net den Alphavantage Token lokal op config.yml ze addéieren, oder d'Ëmweltvariabel exportéieren HORTON_SERVICE_APIKEY. Mir kréien en Token hei.

CRUD Klass

Mir wäerten eng Sammlung vu Wäertpabeieren hunn fir Meta-Informatioun iwwer Wäertpabeieren ze späicheren.

database/security.py

Menger Meenung no ass et net néideg hei eppes z'erklären, an d'Basisklass selwer ass ganz einfach.

get_app()

Loosst eis eng Funktioun addéieren fir en Applikatiounsobjekt ze kreéieren app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Fir elo wäerte mir déi einfachst Applikatioun kreéieren, e bësse méi spéit wäerte mir se awer ausbauen, fir Iech net ze waarden, hei Referenze zu App-Klass. Ech roden Iech och d'Astellungsklass ze kucken, well se fir déi meescht Astellunge verantwortlech ass.

Haaptsaach

Agent fir eng Lëscht vu Wäertpabeieren ze sammelen an z'erhalen

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Also, als éischt kréien mir de Faust Applikatiounsobjekt - et ass ganz einfach. Als nächst erkläre mir explizit en Thema fir eisen Agent ... Hei ass et derwäert ze ernimmen wat et ass, wat den internen Parameter ass a wéi dëst anescht arrangéiert ka ginn.

  1. Themen am Kafka, wa mir déi genau Definitioun wësse wëllen, ass et besser ze liesen aus. Dokument, Oder Dir kënnt liesen Kompendium op Habré op Russesch, wou och alles zimlech genee reflektéiert gëtt :)

  2. Parameter intern, ganz gutt am faust doc beschriwwen, erlaabt eis d'Thema direkt am Code ze konfiguréieren, natierlech, dat heescht d'Parameteren, déi vun de Faust Entwéckler geliwwert ginn, zum Beispill: Retentioun, Retentiounspolitik (par défaut läschen, awer Dir kënnt astellen kompakt), Zuel vun de Partitionen pro Thema (Partiturenze maachen, zum Beispill, manner wéi global Bedeitung Applikatiounen faust).

  3. Am Allgemengen kann den Agent e verwalteten Thema mat globale Wäerter erstellen, awer ech wëll alles explizit deklaréieren. Zousätzlech kënnen e puer Parameteren (zum Beispill d'Zuel vun de Partitionen oder d'Retentiounspolitik) vum Thema an der Agent Annonce net konfiguréiert ginn.

    Hei ass wéi et ausgesäit ouni d'Thema manuell ze definéieren:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Gutt, loosst eis elo beschreiwen wat eisen Agent wäert maachen :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Also, am Ufank vum Agent, öffnen mir eng aiohttp Sessioun fir Ufroen duerch eise Client. Also, wann Dir en Aarbechter start, wann eisen Agent lancéiert gëtt, gëtt eng Sessioun direkt opgemaach - eng, fir déi ganz Zäit wou den Aarbechter leeft (oder e puer, wann Dir de Parameter ännert) gläichzäiteg vun engem Agent mat enger Standardunitéit).

Als nächst verfollege mir de Stream (mir setzen de Message an _, well mir, an dësem Agent, egal iwwer den Inhalt) vun Messagen aus eisem Thema, wa se existéieren am aktuellen Offset, soss wäert eisen Zyklus op hir Arrivée waarden. Gutt, an eiser Loop protokolléiere mir den Empfang vun der Noriicht, kréien eng Lëscht vun aktive (get_securities gëtt nëmmen aktiv als Standard zréck, kuckt Clientcode) Wäertpabeieren a späicheren se an d'Datebank, kontrolléiert ob et eng Sécherheet mat deemselwechten Ticker gëtt an Austausch an der Datebank , wann et ass, da gëtt et (de Pabeier) einfach aktualiséiert.

Loosst eis eis Kreatioun starten!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Fonctiounen Web Komponente Ech wäert de Faust net an den Artikelen berücksichtegen, also setzen mir de passende Fändel.

An eisem Startbefehl hu mir de Faust gesot, wou een no der Applikatiounsobjet sicht a wat mat deem ze maachen (en Aarbechter starten) mam Info-Logoutputniveau. Mir kréien déi folgend Ausgang:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Et ass lieweg!!!

Loosst eis d'Partitionsset kucken. Wéi mir kënne gesinn, gouf en Thema erstallt mam Numm dee mir am Code bezeechent hunn, d'Standardzuel vun de Partitionen (8, geholl aus topic_Partitionen - Applikatiounsobjektparameter), well mir keen individuellen Wäert fir eist Thema uginn hunn (iwwer Partitionen). De lancéierten Agent am Aarbechter gëtt all 8 Partitionen zougewisen, well et deen eenzegen ass, awer dëst wäert méi am Detail am Deel iwwer Clustering diskutéiert ginn.

Gutt, elo kënne mir op eng aner Terminalfenster goen an en eidele Message un eist Thema schécken:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS benotzt @ mir weisen datt mir e Message un en Thema mam Numm "collect_securities" schécken.

An dësem Fall ass de Message op d'Partition 6 gaang - Dir kënnt dëst kontrolléieren andeems Dir op kafdrop op gitt localhost:9000

Gitt an d'Terminalfenster mat eisem Aarbechter, gesi mir e gléckleche Message geschéckt mat Loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Mir kënnen och Mongo kucken (mat Robo3T oder Studio3T) a kucken datt d'Sécherheeten an der Datebank sinn:

Ech si kee Milliardär, an dofir si mir zefridden mat der éischter Vueoptioun.

Hannergrond Aufgaben op Faust, Deel II: Agenten an TeamsHannergrond Aufgaben op Faust, Deel II: Agenten an Teams

Gléck a Freed - den éischten Agent ass prett :)

Agent prett, laang liewen den neien Agent!

Jo, Hären, mir hunn nëmmen 1/3 vum Wee, deen an dësem Artikel preparéiert ass, ofgedeckt, awer net decouragéiert, well elo gëtt et méi einfach.

Also elo brauche mir en Agent deen Meta Informatioun sammelt an se an e Sammeldokument setzt:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Well dësen Agent Informatioun iwwer eng spezifesch Sécherheet veraarbecht, musse mir den Ticker (Symbol) vun dëser Sécherheet an der Noriicht uginn. Fir dësen Zweck am Faust ginn et Records - Klassen déi de Message Schema am Agent Thema erklären.

An dësem Fall, loosst eis goen records.py a beschreiwen wéi de Message fir dëst Thema soll ausgesinn:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Wéi Dir vläicht scho virgestallt hutt, benotzt de Faust d'Python-Typ Annotatioun fir de Message Schema ze beschreiwen, dofir ass d'Mindest Versioun ënnerstëtzt vun der Bibliothéik 3.6.

Loosst eis zréck op den Agent, setzen d'Typen a fügen se derbäi:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Wéi Dir gesitt, passéiere mir en neie Parameter mat engem Schema un d'Thema Initialiséierungsmethod - value_type. Weider follegt alles dem selwechte Schema, also gesinn ech kee Sënn fir op soss eppes ze bleiwen.

Gutt, de leschte Touch ass en Uruff un de Meta Informatiounssammlung Agent fir collect_securitites ze addéieren:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Mir benotzen de virdru ugekënnegt Schema fir de Message. An dësem Fall hunn ech d'.cast Method benotzt well mir net op d'Resultat vum Agent musse waarden, awer et ass derwäert ze ernimmen datt Weeër schéckt e Message zum Thema:

  1. Besetzung - blockéiert net well et kee Resultat erwaart. Dir kënnt d'Resultat net an en anert Thema als Message schécken.

  2. schécken - blockéiert net well et kee Resultat erwaart. Dir kënnt en Agent spezifizéieren am Thema op deen d'Resultat geet.

  3. froen - waart op e Resultat. Dir kënnt en Agent spezifizéieren am Thema op deen d'Resultat geet.

Also, dat ass alles mat Agenten fir haut!

D'Dream Team

Déi lescht Saach, déi ech versprach hunn an dësem Deel ze schreiwen ass Kommandoen. Wéi virdru scho gesot, Kommandoen am Faust sinn e Wrapper ronderëm Klick. Tatsächlech befestegt de Faust einfach eise personaliséierte Kommando op seng Interface wann Dir den -A Schlëssel spezifizéiert

No der ugekënnegt Agenten an agents.py eng Funktioun mat engem Dekorateur addéieren app.kommandod'Method nennen gegruewen у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Also, wa mir d'Lëscht vun de Befehle nennen, da wäert eisen neie Kommando dra sinn:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Mir kënnen et benotzen wéi jiddereen aneren, also loosst eis de Faust Aarbechter nei starten an eng vollwäerteg Sammlung vu Wäertpabeieren ufänken:

> faust -A horton.agents start-collect-securities

Wat wäert dann geschéien?

Am nächsten Deel, andeems Dir déi verbleiwen Agenten als Beispill benotzt, wäerte mir de Sinkmechanismus fir d'Sich no Extremen an de Schlusspräisser vum Handel fir d'Joer an d'Cron-Start vun Agenten berücksichtegen.

Dat ass alles fir haut! Merci fir d'Liesen :)

Code fir dësen Deel

Hannergrond Aufgaben op Faust, Deel II: Agenten an Teams

PS Ënnert dem leschten Deel gouf ech iwwer Faust a confluent Kafka gefrot (wat Features huet confluent?). Et schéngt, datt confluent méi funktionell a ville Weeër ass, awer de Fakt ass datt de Faust keng voll Client Ënnerstëtzung fir Confluent huet - dëst folgt aus Beschreiwunge vu Client Restriktiounen am Dok.

Source: will.com

Setzt e Commentaire