Bakgrundsuppgifter om Faust, del II: Agenter och team

Bakgrundsuppgifter om Faust, del II: Agenter och team

innehÄllsförteckning

  1. Del I: Introduktion

  2. Del II: Agenter och team

Vad gör vi hÀr?

SÄ, sÄ, andra delen. Som skrivits tidigare kommer vi att göra följande i den:

  1. LÄt oss skriva en liten klient för alphavantage pÄ aiohttp med förfrÄgningar om de slutpunkter vi behöver.

  2. LÄt oss skapa en agent som kommer att samla in data om vÀrdepapper och metainformation om dem.

Men det hÀr Àr vad vi kommer att göra för sjÀlva projektet, och nÀr det gÀller snabbforskning, kommer vi att lÀra oss hur man skriver agenter som bearbetar strömningshÀndelser frÄn kafka, sÄvÀl som hur man skriver kommandon (click wrapper), i vÄrt fall - för manuella push-meddelanden till Àmnet som agenten övervakar.

Utbildning

AlphaVantage-klient

LÄt oss först skriva en liten aiohttp-klient för förfrÄgningar till alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Egentligen Àr allt tydligt frÄn det:

  1. AlphaVantage API Àr ganska enkelt och vackert designat, sÄ jag bestÀmde mig för att göra alla förfrÄgningar genom metoden construct_query dÀr det i sin tur finns ett http-anrop.

  2. Jag tar med alla fÀlt till snake_case för komfort.

  3. Jo, logger.catch-dekorationen för vacker och informativ spÄrning.

PS Glöm inte att lÀgga till alphavantage-token lokalt till config.yml, eller exportera miljövariabeln HORTON_SERVICE_APIKEY. Vi fÄr en token hÀr.

CRUD klass

Vi kommer att ha en vÀrdepapperssamling för att lagra metainformation om vÀrdepapper.

database/security.py

Enligt min mening finns det inget behov av att förklara nÄgonting hÀr, och sjÀlva basklassen Àr ganska enkel.

get_app()

LÄt oss lÀgga till en funktion för att skapa ett applikationsobjekt i app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

För nu kommer vi att ha den enklaste applikationsskapandet, lite senare kommer vi att utöka det, dock för att inte lÄta dig vÀnta, hÀr referenser till App-klass. Jag rÄder dig ocksÄ att ta en titt pÄ instÀllningsklassen, eftersom den Àr ansvarig för de flesta instÀllningarna.

Huvuddelen

Agent för insamling och underhÄll av en lista över vÀrdepapper

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

SÄ först fÄr vi faust-applikationsobjektet - det Àr ganska enkelt. DÀrefter deklarerar vi uttryckligen ett Àmne för vÄr agent... HÀr Àr det vÀrt att nÀmna vad det Àr, vad den interna parametern Àr och hur detta kan ordnas annorlunda.

  1. Ämnen i kafka, om vi vill veta den exakta definitionen Ă€r det bĂ€ttre att lĂ€sa av. dokumentera, eller sĂ„ kan du lĂ€sa kompendium pĂ„ HabrĂ© pĂ„ ryska, dĂ€r allt ocksĂ„ reflekteras ganska exakt :)

  2. Parameter intern, som beskrivs ganska bra i faust-dokumentet, tillÄter oss att konfigurera Àmnet direkt i koden, naturligtvis betyder detta parametrarna som tillhandahÄlls av faust-utvecklarna, till exempel: retention, retention policy (som standard radera, men du kan stÀlla in kompakt), antal partitioner per Àmne (poÀngatt göra till exempel mindre Àn global betydelse applikationer faust).

  3. I allmÀnhet kan agenten skapa ett hanterat Àmne med globala vÀrden, men jag gillar att deklarera allt explicit. Dessutom kan vissa parametrar (till exempel antalet partitioner eller lagringspolicy) för Àmnet i agentannonsen inte konfigureras.

    SÄ hÀr kan det se ut utan att manuellt definiera Àmnet:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

NÄvÀl, lÄt oss nu beskriva vad vÄr agent kommer att göra :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

SÄ i början av agenten öppnar vi en aiohttp-session för förfrÄgningar genom vÄr klient. SÄlunda, nÀr en arbetare startar, nÀr vÄr agent startas, kommer en session omedelbart att öppnas - en, under hela tiden arbetaren Àr igÄng (eller flera, om du Àndrar parametern samtidighet frÄn en agent med en standardenhet).

DÀrefter följer vi strömmen (vi placerar meddelandet i _, eftersom vi, i denna agent, inte bryr oss om innehÄllet) av meddelanden frÄn vÄrt Àmne, om de finns vid den aktuella offset, annars kommer vÄr cykel att vÀnta pÄ deras ankomst. Tja, inne i vÄr loop loggar vi mottagandet av meddelandet, fÄr en lista över aktiva (get_securities returnerar endast aktiva som standard, se kundkod) vÀrdepapper och sparar det i databasen, kontrollerar om det finns ett vÀrdepapper med samma ticker och utbyte i databasen, om det finns, kommer det (papperet) helt enkelt att uppdateras.

LÄt oss lansera vÄr skapelse!

> docker-compose up -d
... ЗапусĐș ĐșĐŸĐœŃ‚Đ”ĐčĐœĐ”Ń€ĐŸĐČ ...
> faust -A horton.agents worker --without-web -l info

PS funktioner webbkomponent Jag kommer inte att övervÀga faust i artiklarna, sÄ vi sÀtter rÀtt flagga.

I vÄrt startkommando berÀttade vi för faust var man skulle leta efter applikationsobjektet och vad man skulle göra med det (starta en arbetare) med informationsloggutdatanivÄn. Vi fÄr följande utdata:

Spoiler

ڟa”S† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┮───────────────────────────────────────────────────┘
... Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž, Đ»ĐŸĐłĐž ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┌─────────────
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┮────────────┘ 

Det Àr levande!!!

LÄt oss titta pÄ partitionsuppsÀttningen. Som vi kan se skapades ett Àmne med namnet som vi angav i koden, standardantalet partitioner (8, hÀmtat frÄn topic_partitions - application object parameter), eftersom vi inte angav ett individuellt vÀrde för vÄrt Àmne (via partitioner). Den startade agenten i arbetaren tilldelas alla 8 partitioner, eftersom det Àr den enda, men detta kommer att diskuteras mer detaljerat i delen om klustring.

NÄvÀl, nu kan vi gÄ till ett annat terminalfönster och skicka ett tomt meddelande till vÄrt Àmne:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS anvÀnder @ vi visar att vi skickar ett meddelande till ett Àmne som heter "collect_securities".

I det hÀr fallet gick meddelandet till partition 6 - du kan kontrollera detta genom att gÄ till kafdrop on localhost:9000

NÀr vi gÄr till terminalfönstret med vÄr arbetare kommer vi att se ett glatt meddelande skickat med loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Vi kan ocksÄ titta pÄ mongo (med Robo3T eller Studio3T) och se att vÀrdepapperen finns i databasen:

Jag Àr ingen miljardÀr, och dÀrför nöjer vi oss med det första visningsalternativet.

Bakgrundsuppgifter om Faust, del II: Agenter och teamBakgrundsuppgifter om Faust, del II: Agenter och team

Lycka och glÀdje - den första agenten Àr redo :)

Agent redo, lÀnge leve den nya agenten!

Ja, mina herrar, vi har bara tÀckt 1/3 av vÀgen som utarbetats av den hÀr artikeln, men var inte avskrÀckta, för nu kommer det att bli lÀttare.

SÄ nu behöver vi en agent som samlar in metainformation och lÀgger in den i ett insamlingsdokument:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Eftersom denna agent kommer att bearbeta information om ett specifikt vĂ€rdepapper, mĂ„ste vi ange tickern (symbolen) för detta vĂ€rdepapper i meddelandet. För detta Ă€ndamĂ„l i faust finns det Register — klasser som deklarerar meddelandeschemat i agentĂ€mnet.

I det hÀr fallet, lÄt oss gÄ till records.py och beskriv hur meddelandet för detta Àmne ska se ut:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Som du kanske har gissat anvÀnder faust annoteringen av pythontyp för att beskriva meddelandeschemat, varför den minsta versionen som stöds av biblioteket Àr 3.6.

LÄt oss ÄtergÄ till agenten, stÀlla in typerna och lÀgga till den:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Som du kan se skickar vi en ny parameter med ett schema till Àmnesinitieringsmetoden - value_type. Dessutom följer allt samma schema, sÄ jag ser ingen mening med att uppehÄlla mig vid nÄgot annat.

Tja, sista handen Àr att lÀgga till ett samtal till agenten för insamling av metainformation till collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Vi anvÀnder det tidigare aviserade schemat för meddelandet. I det hÀr fallet anvÀnde jag .cast-metoden eftersom vi inte behöver vÀnta pÄ resultatet frÄn agenten, men det Àr vÀrt att nÀmna att sÀtt skicka ett meddelande till Àmnet:

  1. cast - blockerar inte eftersom den inte förvÀntar sig ett resultat. Du kan inte skicka resultatet till ett annat Àmne som ett meddelande.

  2. skicka - blockerar inte eftersom det inte förvÀntar sig ett resultat. Du kan ange en agent i Àmnet som resultatet ska gÄ till.

  3. frÄga - vÀntar pÄ ett resultat. Du kan ange en agent i Àmnet som resultatet ska gÄ till.

SÄ, det Àr allt med agenter för idag!

Dröm teamet

Det sista jag lovade att skriva i den hÀr delen Àr kommandon. Som nÀmnts tidigare Àr kommandon i faust ett omslag runt klick. Faktum Àr att faust helt enkelt kopplar vÄrt anpassade kommando till dess grÀnssnitt nÀr du anger -A-tangenten

Efter de annonserade agenterna in agents.py lÀgg till en funktion med en dekoratör app.commandkallar metoden gjutas у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

SÄledes, om vi anropar listan med kommandon, kommer vÄrt nya kommando att finnas i den:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Vi kan anvÀnda det som alla andra, sÄ lÄt oss starta om faust-arbetaren och pÄbörja en fullfjÀdrad samling av vÀrdepapper:

> faust -A horton.agents start-collect-securities

Vad kommer hÀnda hÀrnÀst?

I nÀsta del, med de ÄterstÄende agenterna som exempel, kommer vi att övervÀga sjunkmekanismen för att söka efter ytterligheter i stÀngningspriserna för handel för Äret och kronlanseringen av agenter.

Det Àr allt för idag! Tack för att du lÀser :)

Kod för denna del

Bakgrundsuppgifter om Faust, del II: Agenter och team

PS Under den sista delen fick jag frÄgan om faust och confluent kafka (vilka egenskaper har confluent?). Det verkar som att confluent Àr mer funktionellt pÄ mÄnga sÀtt, men faktum Àr att faust inte har fullt klientstöd för confluent - detta följer av beskrivningar av klientrestriktioner i dokumentet.

KĂ€lla: will.com

LĂ€gg en kommentar