Agtergrondtake oor Faust, Deel II: Agente en spanne

Agtergrondtake oor Faust, Deel II: Agente en spanne

Inhoudsopgawe

  1. Deel I: Inleiding

  2. Deel II: Agente en spanne

Wat maak ons ​​hier?

So, so, die tweede deel. Soos vroeër geskryf, sal ons die volgende doen:

  1. Kom ons skryf 'n klein kliënt vir alphavantage op aiohttp met versoeke vir die eindpunte wat ons benodig.

  2. Kom ons skep 'n agent wat data oor sekuriteite en meta-inligting daaroor sal insamel.

Maar dit is wat ons vir die projek self sal doen, en in terme van faustnavorsing, sal ons leer hoe om agente te skryf wat stroomgebeure vanaf kafka verwerk, asook hoe om opdragte (click wrapper) te skryf, in ons geval - vir handmatige drukboodskappe na die onderwerp wat die agent monitor.

Opleiding

AlphaVantage kliënt

Kom ons skryf eers 'n klein aiohttp-kliënt vir versoeke om alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eintlik is alles duidelik daaruit:

  1. Die AlphaVantage API is eenvoudig en pragtig ontwerp, so ek het besluit om alle versoeke deur die metode te rig construct_query waar daar weer 'n http-oproep is.

  2. Ek bring al die velde na snake_case vir troos.

  3. Wel, die logger.catch-versiering vir pragtige en insiggewende naspooruitset.

NS Moenie vergeet om die alphavantage-token plaaslik by config.yml by te voeg, of die omgewingsveranderlike uit te voer nie HORTON_SERVICE_APIKEY. Ons ontvang 'n teken hier.

CRUD klas

Ons sal 'n sekuriteitsversameling hê om meta-inligting oor sekuriteite te stoor.

databasis/sekuriteit.py

Na my mening is dit nie nodig om enigiets hier te verduidelik nie, en die basisklas self is redelik eenvoudig.

kry_app()

Kom ons voeg 'n funksie by om 'n toepassingsvoorwerp in te skep app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Vir nou sal ons die eenvoudigste toepassingskepping hê, 'n bietjie later sal ons dit egter uitbrei om nie te laat wag nie, hier verwysings na App-klas. Ek raai jou ook aan om na die instellingsklas te kyk, aangesien dit verantwoordelik is vir die meeste van die instellings.

Hoofliggaam

Agent vir die insameling en instandhouding van 'n lys van sekuriteite

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

So, eers kry ons die faust-toepassingsvoorwerp - dit is redelik eenvoudig. Vervolgens verklaar ons uitdruklik 'n onderwerp vir ons agent ... Hier is dit die moeite werd om te noem wat dit is, wat die interne parameter is en hoe dit anders gereël kan word.

  1. Onderwerpe in kafka, as ons die presiese definisie wil weet, is dit beter om te lees af. dokument, of jy kan lees kompendium op Habré in Russies, waar alles ook redelik akkuraat weerspieël word :)

  2. Interne parameter, wat redelik goed beskryf word in die faust doc, stel ons in staat om die onderwerp direk in die kode op te stel, natuurlik, dit beteken die parameters wat deur die faust-ontwikkelaars verskaf word, byvoorbeeld: retensie, retensiebeleid (by verstek verwyder, maar jy kan stel compact), aantal partisies per onderwerp (tellingsom byvoorbeeld minder te doen as wêreldwye betekenis toepassings faust).

  3. Oor die algemeen kan die agent 'n bestuurde onderwerp met globale waardes skep, maar ek hou daarvan om alles uitdruklik te verklaar. Daarbenewens kan sommige parameters (byvoorbeeld die aantal partisies of retensiebeleid) van die onderwerp in die agentadvertensie nie opgestel word nie.

    Hier is hoe dit kan lyk sonder om die onderwerp handmatig te definieer:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Wel, laat ons nou beskryf wat ons agent sal doen :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dus, aan die begin van die agent, maak ons ​​'n aiohttp-sessie oop vir versoeke deur ons kliënt. Dus, wanneer 'n werker begin word, wanneer ons agent geloods word, sal 'n sessie onmiddellik oopgemaak word - een, vir die hele tyd wat die werker aan die gang is (of verskeie, as jy die parameter verander gelyktydigheid van 'n agent met 'n verstekeenheid).

Volgende volg ons die stroom (ons plaas die boodskap in _, aangesien ons, in hierdie agent, nie omgee vir die inhoud) van boodskappe uit ons onderwerp, as hulle by die huidige afwyking bestaan, anders sal ons siklus wag vir hul aankoms. Wel, binne ons lus teken ons die ontvangs van die boodskap aan, kry 'n lys van aktiewe (get_securities gee slegs aktief by verstek terug, sien kliëntkode) sekuriteite en stoor dit in die databasis, en kyk of daar 'n sekuriteit is met dieselfde tikker en ruil in die databasis , as daar is, dan sal dit (die vraestel) eenvoudig opgedateer word.

Kom ons begin ons skepping!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Kenmerke web komponent Ek sal nie faust in die artikels oorweeg nie, so ons stel die toepaslike vlag.

In ons bekendstellingsbevel het ons vir faust gesê waar om na die toepassingsvoorwerp te soek en wat om daarmee te doen (begin 'n werker) met die infolog-uitsetvlak. Ons kry die volgende uitset:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Dis lewendig!!!

Kom ons kyk na die partisiestel. Soos ons kan sien, is 'n onderwerp geskep met die naam wat ons in die kode aangewys het, die verstek aantal partisies (8, geneem uit onderwerp_partisies - toepassingsobjekparameter), aangesien ons nie 'n individuele waarde vir ons onderwerp gespesifiseer het nie (via partisies). Aan die geloodsde agent in die werker word al 8 partisies toegewys, aangesien dit die enigste een is, maar dit sal in meer besonderhede in die deel oor groepering bespreek word.

Wel, nou kan ons na 'n ander terminale venster gaan en 'n leë boodskap na ons onderwerp stuur:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS gebruik @ ons wys dat ons 'n boodskap stuur na 'n onderwerp genaamd "versamel_sekuriteite".

In hierdie geval het die boodskap na partisie 6 gegaan - jy kan dit nagaan deur na kafdrop aan te gaan localhost:9000

As ons saam met ons werker na die terminale venster gaan, sal ons 'n gelukkige boodskap sien wat met loguru gestuur word:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Ons kan ook na mongo kyk (met behulp van Robo3T of Studio3T) en sien dat die sekuriteite in die databasis is:

Ek is nie 'n miljardêr nie, en daarom is ons tevrede met die eerste kykopsie.

Agtergrondtake oor Faust, Deel II: Agente en spanneAgtergrondtake oor Faust, Deel II: Agente en spanne

Geluk en vreugde - die eerste agent is gereed :)

Agent gereed, lank lewe die nuwe agent!

Ja, here, ons het net 1/3 van die pad afgelê wat deur hierdie artikel voorberei is, maar moenie moedeloos wees nie, want nou sal dit makliker wees.

So nou het ons 'n agent nodig wat meta-inligting insamel en dit in 'n versamelingsdokument plaas:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Aangesien hierdie agent inligting oor 'n spesifieke sekuriteit sal verwerk, moet ons die tikker (simbool) van hierdie sekuriteit in die boodskap aandui. Vir hierdie doel in faust is daar Rekords — klasse wat die boodskapskema in die agentonderwerp verklaar.

In hierdie geval, kom ons gaan na rekords.py en beskryf hoe die boodskap vir hierdie onderwerp moet lyk:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Soos jy dalk geraai het, gebruik faust die python-tipe annotasie om die boodskapskema te beskryf, en daarom is die minimum weergawe wat deur die biblioteek ondersteun word 3.6.

Kom ons keer terug na die agent, stel die tipes in en voeg dit by:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Soos u kan sien, gee ons 'n nuwe parameter met 'n skema deur na die onderwerp-inisialiseringsmetode - waarde_tipe. Verder volg alles dieselfde skema, so ek sien geen sin daarin om by enigiets anders stil te staan ​​nie.

Wel, die finale aanraking is om 'n oproep by die meta-inligtingversamelingsagent te voeg na collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Ons gebruik die voorheen aangekondigde skema vir die boodskap. In hierdie geval het ek die .cast-metode gebruik aangesien ons nie hoef te wag vir die uitslag van die agent nie, maar dit is die moeite werd om te noem dat maniere stuur 'n boodskap na die onderwerp:

  1. cast - blokkeer nie omdat dit nie 'n resultaat verwag nie. Jy kan nie die resultaat as 'n boodskap na 'n ander onderwerp stuur nie.

  2. stuur - blokkeer nie omdat dit nie 'n resultaat verwag nie. Jy kan 'n agent in die onderwerp spesifiseer waarna die resultaat sal gaan.

  3. vra - wag vir 'n uitslag. Jy kan 'n agent in die onderwerp spesifiseer waarna die resultaat sal gaan.

So, dit is alles met agente vir vandag!

Die droomspan

Die laaste ding wat ek belowe het om in hierdie deel te skryf, is opdragte. Soos vroeër genoem, is opdragte in faust 'n omhulsel rondom klik. Trouens, faust heg eenvoudig ons persoonlike opdrag aan sy koppelvlak wanneer die -A-sleutel gespesifiseer word

Nadat die aangekondigde agente in agents.py voeg 'n funksie met 'n versierder by app.opdragroep die metode gooi у versamel_sekuriteite:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dus, as ons die lys van opdragte noem, sal ons nuwe opdrag daarin wees:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ons kan dit gebruik soos enigiemand anders, so kom ons begin die faust-werker weer en begin met 'n volwaardige versameling van sekuriteite:

> faust -A horton.agents start-collect-securities

Wat sal volgende gebeur?

In die volgende deel, met die oorblywende agente as 'n voorbeeld, sal ons die sinkmeganisme oorweeg om na uiterstes te soek in die sluitingspryse van verhandeling vir die jaar en die kroonbekendstelling van agente.

Dis al vir vandag! Dankie vir die lees :)

Kode vir hierdie deel

Agtergrondtake oor Faust, Deel II: Agente en spanne

NS Onder die laaste deel is ek gevra oor faust en konfluent kafka (watter kenmerke het confluent?). Dit blyk dat confluent in baie opsigte meer funksioneel is, maar die feit is dat faust nie volle kliëntondersteuning vir confluent het nie - dit volg uit beskrywings van kliëntbeperkings in die dokument.

Bron: will.com

Voeg 'n opmerking