Taustatehtävät aiheesta Faust, Osa II: Agentit ja tiimit

Taustatehtävät aiheesta Faust, Osa II: Agentit ja tiimit

sisällysluettelo

  1. Osa I: Johdanto

  2. Osa II: Agentit ja tiimit

Mitä me täällä teemme?

Joten, niin, toinen osa. Kuten aiemmin kirjoitettiin, teemme siinä seuraavat:

  1. Kirjoitetaan pieni asiakas alfavantagelle aiohttp:lle, jossa pyydetään tarvitsemiamme päätepisteitä.

  2. Luodaan agentti, joka kerää tietoja arvopapereista ja metatietoja niistä.

Mutta tämän me teemme itse projektille, ja faust-tutkimuksen kannalta opimme kirjoittamaan agentteja, jotka käsittelevät kafkan suoratoistotapahtumia, sekä kuinka kirjoittaa komentoja (klikkaa käärettä), meidän tapauksessamme - manuaalisille push-viesteille agentin valvomaan aiheeseen.

Koulutus

AlphaVantage-asiakas

Ensin kirjoitetaan pieni aiohttp-asiakas alfavantage-pyyntöjä varten.

alphavantage.py

Spoileri

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Itse asiassa kaikki on selvää siitä:

  1. AlphaVantage API on melko yksinkertainen ja kauniisti suunniteltu, joten päätin tehdä kaikki pyynnöt menetelmän kautta construct_query jossa vuorostaan ​​on http-kutsu.

  2. Tuon kaikki kentät snake_case mukavuuden vuoksi.

  3. Logger.catch-koristelu kauniiseen ja informatiiviseen jäljitystulokseen.

PS Älä unohda lisätä alphavantage-tunnusta paikallisesti config.yml-tiedostoon tai viedä ympäristömuuttujaa HORTON_SERVICE_APIKEY. Saamme merkin täällä.

CRUD-luokka

Meillä on arvopaperikokoelma tallentaaksemme metatietoja arvopapereista.

tietokanta/security.py

Mielestäni tässä ei tarvitse selittää mitään, ja itse perusluokka on melko yksinkertainen.

Hanki sovellus()

Lisätään funktio sovellusobjektin luomiseksi app.py

Spoileri

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Toistaiseksi meillä on yksinkertaisin sovellusten luominen, hieman myöhemmin laajennamme sitä, mutta jotta et joutuisi odottamaan, tässä viittauksia App-luokkaan. Suosittelen myös katsomaan asetusluokkaa, koska se vastaa suurimmasta osasta asetuksista.

Pääosa

Agentti arvopaperiluettelon keräämiseen ja ylläpitämiseen

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Joten ensin saamme faust-sovellusobjektin - se on melko yksinkertaista. Seuraavaksi ilmoitamme nimenomaisesti agentillemme aiheen... Tässä kannattaa mainita, mikä se on, mikä on sisäinen parametri ja miten tämä voidaan järjestää toisin.

  1. Kafkan aiheet, jos haluamme tietää tarkan määritelmän, on parempi lukea vinossa. asiakirja, tai voit lukea yhteenveto venäjänkielisellä Habrella, jossa kaikki heijastuu myös melko tarkasti :)

  2. Sisäinen parametri, joka on kuvattu melko hyvin faust-dokumentissa, mahdollistaa aiheen määrittämisen suoraan koodissa, tämä tarkoittaa tietysti faust-kehittäjien antamia parametreja, esimerkiksi: säilyttäminen, säilytyskäytäntö (oletuksena poistaa, mutta voit asettaa kompakti), osioiden lukumäärä aihetta kohti (tuloksettehdä esimerkiksi vähemmän kuin maailmanlaajuista merkitystä sovellukset faust).

  3. Yleensä agentti voi luoda hallitun aiheen globaaleilla arvoilla, mutta haluan ilmoittaa kaiken eksplisiittisesti. Lisäksi joitain agenttimainoksen aiheen parametreja (esimerkiksi osioiden lukumäärää tai säilytyskäytäntöä) ei voi määrittää.

    Tältä se saattaa näyttää ilman aiheen manuaalista määrittelyä:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No, nyt kuvataan mitä agenttimme tekee :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Joten agentin alussa avaamme aiohttp-istunnon asiakkaamme kautta oleville pyynnöille. Siten työntekijää käynnistettäessä, kun agenttimme käynnistetään, istunto avautuu välittömästi - yksi, koko työntekijän ollessa käynnissä (tai useita, jos muutat parametria samanaikaisuuden agentilta, jolla on oletusyksikkö).

Seuraavaksi seuraamme streamia (sijoitamme viestin _, koska me tässä agentissa emme välitä aiheemme viestien sisällöstä, jos ne ovat olemassa nykyisellä siirrolla, muuten syklimme odottaa niiden saapumista. No, silmukassamme kirjaamme viestin vastaanoton, hankimme luettelon aktiivisista (get_securities palauttaa oletusarvoisesti vain aktiivisia, katso asiakaskoodi) arvopapereista ja tallennamme sen tietokantaan tarkistaen, onko arvopapereita samalla tickerillä ja vaihto tietokannassa, jos sellainen on, se (paperi) yksinkertaisesti päivitetään.

Aloitetaan luomuksemme!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS-ominaisuudet verkkokomponentti En ota Faustia artikkeleissa huomioon, joten asetamme oikean lipun.

Käynnistyskomennossamme kerroimme faustille, mistä sovellusobjektia etsitään ja mitä sillä tehdään (käynnistetään worker) infolokin lähtötasolla. Saamme seuraavan tuloksen:

Spoileri

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Se on elossa!!!

Katsotaanpa osiojoukkoa. Kuten näemme, aihe luotiin nimellä, jonka määritimme koodissa, oletusarvoisella osioiden määrällä (8, otettu topic_partitions - sovellusobjektiparametri), koska emme määrittäneet aiheellemme yksittäistä arvoa (osioiden kautta). Työntekijässä käynnistetylle agentille on osoitettu kaikki 8 osiota, koska se on ainoa, mutta tätä käsitellään tarkemmin klusterointia käsittelevässä osassa.

No, nyt voimme mennä toiseen pääteikkunaan ja lähettää tyhjän viestin aiheeseemme:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS käytössä @ näytämme, että lähetämme viestin aiheeseen nimeltä "kerää_arvopaperit".

Tässä tapauksessa viesti meni osioon 6 - voit tarkistaa tämän menemällä osoitteeseen kafdrop on localhost:9000

Menemme työntekijämme kanssa pääteikkunaan, näemme iloisen viestin, joka lähetetään logurulla:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Voimme myös tarkastella mongoa (Robo3T:llä tai Studio3T:llä) ja nähdä, että arvopaperit ovat tietokannassa:

En ole miljardööri, ja siksi olemme tyytyväisiä ensimmäiseen katseluvaihtoehtoon.

Taustatehtävät aiheesta Faust, Osa II: Agentit ja tiimitTaustatehtävät aiheesta Faust, Osa II: Agentit ja tiimit

Onnea ja iloa - ensimmäinen agentti on valmis :)

Agentti valmis, eläköön uusi agentti!

Kyllä, herrat, olemme käyneet läpi vain 1/3 tämän artikkelin laatimasta polusta, mutta älkää lannistuko, sillä nyt se on helpompaa.

Joten nyt tarvitsemme agentin, joka kerää metatietoja ja laittaa ne keräysasiakirjaan:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Koska tämä agentti käsittelee tietoja tietystä arvopaperista, meidän on ilmoitettava tämän suojauksen merkki (symboli) viestissä. Tätä tarkoitusta varten faustissa on Asiakirjat — luokat, jotka ilmoittavat viestimallin agenttiaiheessa.

Tässä tapauksessa mennään records.py ja kuvaile, miltä tämän aiheen viestin pitäisi näyttää:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kuten arvata saattaa, faust käyttää python-tyyppistä huomautusta viestiskeeman kuvaamiseen, minkä vuoksi kirjaston tukema vähimmäisversio on 3.6.

Palataan agenttiin, asetetaan tyypit ja lisätään se:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kuten näet, välitämme aiheen alustusmenetelmälle uuden parametrin kaavan kanssa - arvo_tyyppi. Lisäksi kaikki noudattaa samaa kaavaa, joten en näe mitään järkeä keskittyä mihinkään muuhun.

No, viimeinen silaus on lisätä kutsu metatietojen keräämisagentille collection_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Käytämme viestissä aiemmin ilmoitettua mallia. Tässä tapauksessa käytin .cast-menetelmää, koska meidän ei tarvitse odottaa agentin tulosta, mutta on syytä mainita, että tapoja lähetä viesti aiheeseen:

  1. heittää - ei estä, koska se ei odota tulosta. Tulosta ei voi lähettää viestinä toiseen aiheeseen.

  2. lähetä - ei estä, koska se ei odota tulosta. Voit määrittää aiheessa agentin, jolle tulos siirtyy.

  3. kysyä - odottaa tulosta. Voit määrittää aiheessa agentin, jolle tulos siirtyy.

Joten, tässä kaikki agenttien kanssa tälle päivälle!

Unelmajoukkue

Viimeinen asia, jonka lupasin kirjoittaa tähän osaan, ovat komennot. Kuten aiemmin mainittiin, faustin komennot ovat kääre napsautuksen ympärillä. Itse asiassa Faust yksinkertaisesti liittää mukautetun komentomme käyttöliittymäänsä määrittäessään -A-näppäintä

Ilmoitettujen agenttien jälkeen agents.py lisää toiminto sisustajalla app.commandkutsumalla menetelmää heittää у kerää_arvopapereita:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Joten jos kutsumme komentoluetteloa, uusi komentomme on siinä:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Voimme käyttää sitä kuten kuka tahansa, joten käynnistetään faust worker uudelleen ja aloitetaan täysi arvopaperien kerääminen:

> faust -A horton.agents start-collect-securities

Mitä seuraavaksi tapahtuu?

Seuraavassa osassa tarkastellaan jäljelle jääneitä agentteja esimerkkinä nielumekanismia äärimmäisyyksien etsimiseen vuoden kaupankäynnin päätöskursseista ja agenttien kruunujen lanseerauksesta.

Siinä kaikki tältä päivältä! Kiitos kun luit :)

Koodi tälle osalle

Taustatehtävät aiheesta Faust, Osa II: Agentit ja tiimit

PS Viimeisen osan alla minulta kysyttiin faust and confluent kafkasta (mitä ominaisuuksia confluentilla on?). Näyttää siltä, ​​että confluent on monella tapaa toimivampi, mutta tosiasia on, että Faustilla ei ole täyttä asiakastukea confluentille - tämä seuraa asiakasrajoitusten kuvaukset dokumentissa.

Lähde: will.com

Lisää kommentti