Bakgrunnsverkefni um Faust, Part II: Agents and Teams

Bakgrunnsverkefni um Faust, Part II: Agents and Teams

efnisyfirlit

  1. I. hluti: Inngangur

  2. Part II: Umboðsmenn og lið

Hvað erum við að gera hér?

Svo, svo, seinni hlutinn. Eins og skrifað var áðan munum við gera eftirfarandi:

  1. Við skulum skrifa lítinn viðskiptavin fyrir alphavantage á aiohttp með beiðnum um endapunktana sem við þurfum.

  2. Búum til umboðsmann sem mun safna gögnum um verðbréf og metaupplýsingar um þau.

En, þetta er það sem við munum gera fyrir verkefnið sjálft, og hvað varðar nákvæmar rannsóknir, munum við læra hvernig á að skrifa umboðsmenn sem vinna straumviðburði frá kafka, sem og hvernig á að skrifa skipanir (smelltu umbúðir), í okkar tilviki - fyrir handvirk ýtt skilaboð til efnisins sem umboðsmaðurinn er að fylgjast með.

Þjálfun

AlphaVantage viðskiptavinur

Í fyrsta lagi skulum við skrifa lítinn aiohttp viðskiptavin fyrir beiðnir um alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Reyndar er allt ljóst af því:

  1. AlphaVantage API er einfaldlega og fallega hannað, svo ég ákvað að gera allar beiðnir í gegnum aðferðina construct_query þar sem aftur er http símtal.

  2. Ég kem með alla vellina til snake_case til þæginda.

  3. Jæja, logger.catch skreytingin fyrir fallega og upplýsandi rakningarútgang.

PS Ekki gleyma að bæta alphavantage tákninu á staðnum við config.yml, eða flytja út umhverfisbreytuna HORTON_SERVICE_APIKEY. Við fáum tákn hér.

CRUD flokkur

Við verðum með verðbréfasafn til að geyma metaupplýsingar um verðbréf.

gagnagrunnur/öryggi.py

Að mínu mati er óþarfi að útskýra neitt hér og grunnflokkurinn sjálfur er frekar einfaldur.

fá_app()

Við skulum bæta við aðgerð til að búa til forritshlut í app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Í bili munum við búa til einfaldasta forritið, aðeins síðar munum við stækka það, þó til að láta þig ekki bíða, hér tilvísanir í App-flokki. Ég ráðlegg þér líka að kíkja á stillingaflokkinn þar sem hann er ábyrgur fyrir flestum stillingum.

Helstu hluti

Umboðsaðili fyrir söfnun og viðhald lista yfir verðbréf

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Svo fyrst fáum við fatast forritshlutinn - hann er frekar einfaldur. Næst lýsum við skýrt yfir efni fyrir umboðsmann okkar... Hér er rétt að minnast á hvað það er, hver innri færibreytan er og hvernig hægt er að raða þessu öðruvísi.

  1. Efni í kafka, ef við viljum vita nákvæma skilgreiningu, þá er betra að lesa af. skjal, eða þú getur lesið samantekt á Habré á rússnesku, þar sem allt endurspeglast líka nokkuð nákvæmlega :)

  2. Innri færibreyta, sem lýst er nokkuð vel í faust skjalinu, gerir okkur kleift að stilla efnið beint í kóðann, auðvitað þýðir þetta færibreyturnar sem faust forritararnir gefa upp, til dæmis: varðveisla, varðveislustefna (sjálfgefið að eyða, en þú getur stillt samningur), fjöldi skiptinga á hverju efni (skoraað gera til dæmis minna en alþjóðlegt mikilvægi forritin eru fljót).

  3. Almennt séð getur umboðsmaðurinn búið til stýrt efni með alþjóðlegum gildum, hins vegar finnst mér gaman að lýsa öllu yfir með skýrum hætti. Að auki er ekki hægt að stilla sumar færibreytur (til dæmis fjölda skiptinga eða varðveislustefnu) efnisins í auglýsingunni um umboðsmann.

    Svona gæti það litið út án þess að skilgreina efnið handvirkt:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Jæja, nú skulum við lýsa því hvað umboðsmaður okkar mun gera :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Svo, í upphafi umboðsmannsins, opnum við aiohttp fundi fyrir beiðnir í gegnum viðskiptavini okkar. Þannig að þegar starfsmaður er ræstur, þegar umboðsmaður okkar er ræstur, verður fundur strax opnaður - einn, allan tímann sem starfsmaðurinn er í gangi (eða fleiri, ef þú breytir færibreytunni samtímis frá umboðsmanni með sjálfgefna einingu).

Næst fylgjumst við með straumnum (við setjum skilaboðin inn _, þar sem okkur, í þessum umboðsmanni, er alveg sama um innihald) skilaboða frá umræðuefninu okkar, ef þau eru til á núverandi móti, annars mun hringrás okkar bíða eftir komu þeirra. Jæja, inni í lykkjunni okkar skráum við móttöku skilaboðanna, fáum lista yfir virk (get_securities skilar aðeins virkum sjálfgefið, sjá kóða viðskiptavinar) verðbréf og vistum það í gagnagrunninum, athugum hvort það sé verðbréf með sama auðkenni og skipti í gagnagrunninum, ef það er, þá verður það (blaðið) einfaldlega uppfært.

Við skulum hefja sköpun okkar!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS eiginleikar vefhluti Ég mun ekki íhuga faust í greinunum, svo við setjum viðeigandi fána.

Í ræsingarskipuninni okkar sögðum við faust hvar ætti að leita að forritshlutnum og hvað ætti að gera við hann (ræsa starfsmann) með úttaksstigi upplýsingaskrárinnar. Við fáum eftirfarandi úttak:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Það er á lífi!!!

Við skulum líta á skiptingarsettið. Eins og við sjáum var búið til umræðuefni með nafninu sem við tilgreindum í kóðanum, sjálfgefinn fjölda skiptinga (8, tekið úr efnisskiptingar - færibreytu forritshluta), þar sem við tilgreindum ekki einstakt gildi fyrir efnið okkar (í gegnum skipting). Hinn ræsti umboðsmaður í starfsmanninum er úthlutað öllum 8 skiptingunum, þar sem hann er sá eini, en það verður fjallað nánar um það í hlutanum um þyrping.

Jæja, nú getum við farið í annan flugstöðvarglugga og sent tóm skilaboð í efnið okkar:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS að nota @ við sýnum að við erum að senda skilaboð til efnis sem heitir „safna_verðbréf“.

Í þessu tilviki fóru skilaboðin í skipting 6 - þú getur athugað þetta með því að fara í kafdrop on localhost:9000

Þegar við förum í flugstöðvargluggann með starfsmanninum okkar munum við sjá ánægjuleg skilaboð send með loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Við getum líka skoðað mongo (með því að nota Robo3T eða Studio3T) og sjá að verðbréfin eru í gagnagrunninum:

Ég er ekki milljarðamæringur og þess vegna erum við sátt við fyrsta áhorfsvalkostinn.

Bakgrunnsverkefni um Faust, Part II: Agents and TeamsBakgrunnsverkefni um Faust, Part II: Agents and Teams

Hamingja og gleði - fyrsti umboðsmaðurinn er tilbúinn :)

Umboðsmaður tilbúinn, lengi lifi nýi umboðsmaðurinn!

Já, herrar mínir, við höfum aðeins farið yfir 1/3 af leiðinni sem útbúin er með þessari grein, en ekki hugfallast, því nú verður það auðveldara.

Svo núna þurfum við umboðsmann sem safnar metaupplýsingum og setur þær í söfnunarskjal:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Þar sem þessi umboðsmaður mun vinna úr upplýsingum um tiltekið verðbréf, þurfum við að gefa til kynna auðkenni (tákn) þessa verðbréfs í skilaboðunum. Í þessu skyni í faust eru Skrár - flokkar sem lýsa yfir skilaboðakerfinu í umboðsefninu.

Í þessu tilfelli skulum við fara til records.py og lýstu hvernig skilaboðin fyrir þetta efni ættu að líta út:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Eins og þú gætir hafa giskað á, notar faust python-tegundarskýringuna til að lýsa skilaboðaskemmunni, sem er ástæðan fyrir því að lágmarksútgáfan sem styður bókasafnið er 3.6.

Við skulum snúa aftur til umboðsmannsins, stilla tegundirnar og bæta því við:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Eins og þú sérð sendum við nýja færibreytu með kerfi til upphafsaðferðar efnisins - value_type. Ennfremur fylgir allt sama kerfi, svo ég sé ekki tilgang í að dvelja við neitt annað.

Jæja, lokahnykkurinn er að bæta við símtali við metaupplýsingasöfnunaraðilann til collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Við notum áður tilkynnt kerfi fyrir skilaboðin. Í þessu tilviki notaði ég .cast aðferðina þar sem við þurfum ekki að bíða eftir niðurstöðunni frá umboðsmanni, en þess má geta að leiðir sendu skilaboð á efnið:

  1. cast - blokkar ekki vegna þess að það býst ekki við niðurstöðu. Þú getur ekki sent niðurstöðuna í annað efni sem skilaboð.

  2. send - lokar ekki vegna þess að það býst ekki við niðurstöðu. Þú getur tilgreint umboðsmann í efninu sem niðurstaðan mun fara í.

  3. spyrja - bíður eftir niðurstöðu. Þú getur tilgreint umboðsmann í efninu sem niðurstaðan mun fara í.

Svo, það er allt með umboðsmenn í dag!

Draumaliðið

Það síðasta sem ég lofaði að skrifa í þessum hluta eru skipanir. Eins og fyrr segir eru skipanir í faust umbúðir utan um smell. Reyndar festir faust einfaldlega sérsniðna skipun okkar við viðmót sitt þegar hann tilgreinir -A lykilinn

Eftir að tilkynntir umboðsmenn í agents.py bæta við aðgerð með skreytingamanni app.skipunkalla aðferðina kastað у safna_verðbréfum:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Þannig að ef við köllum listann yfir skipanir mun nýja skipunin okkar vera í honum:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Við getum notað það eins og hvern sem er, svo við skulum endurræsa Faust-starfsmanninn og hefja fullbúið safn af verðbréfum:

> faust -A horton.agents start-collect-securities

Hvað gerist næst?

Í næsta hluta, með því að nota umboðsmennina sem eftir eru sem dæmi, munum við íhuga vaskabúnaðinn til að leita að öfgum í lokaverði viðskipta á árinu og upphafssetningu umboðsmanna.

Það er allt í dag! Takk fyrir að lesa :)

Kóði fyrir þennan hluta

Bakgrunnsverkefni um Faust, Part II: Agents and Teams

PS Undir síðasta hlutanum var ég spurður um faust og confluent kafka (hvaða eiginleika hefur confluent?). Svo virðist sem confluent sé virkari á margan hátt, en staðreyndin er sú að faust hefur ekki fullan stuðning viðskiptavina fyrir confluent - þetta leiðir af lýsingar á takmörkunum viðskiptavina í skjalinu.

Heimild: www.habr.com

Bæta við athugasemd