Kazi za usuli kwenye Faust, Sehemu ya II: Mawakala na Timu

Kazi za usuli kwenye Faust, Sehemu ya II: Mawakala na Timu

Meza ya yaliyomo

  1. Sehemu ya I: Utangulizi

  2. Sehemu ya II: Mawakala na Timu

Tunafanya nini hapa?

Kwa hivyo, sehemu ya pili. Kama ilivyoandikwa hapo awali, ndani yake tutafanya yafuatayo:

  1. Wacha tuandike mteja mdogo kwa alfavantage kwenye aiohttp na maombi ya miisho tunayohitaji.

  2. Hebu tuunde wakala ambaye atakusanya data kuhusu dhamana na taarifa za meta kuzihusu.

Lakini, hii ndio tutafanya kwa mradi wenyewe, na kwa suala la utafiti wa haraka, tutajifunza jinsi ya kuandika mawakala ambao hushughulikia matukio ya mkondo kutoka kwa kafka, na pia jinsi ya kuandika amri (bonyeza wrapper), kwa upande wetu - kwa ujumbe wa kusukuma kwa mikono kwa mada ambayo wakala anafuatilia.

Mafunzo ya

Mteja wa AlphaVantage

Kwanza, hebu tuandike mteja mdogo wa aiohttp kwa maombi ya alphavantage.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Kwa kweli, kila kitu ni wazi kutoka kwake:

  1. API ya AlphaVantage imeundwa kwa urahisi na uzuri, kwa hivyo niliamua kufanya maombi yote kupitia njia hiyo construct_query ambapo kwa upande kuna simu ya http.

  2. Ninaleta mashamba yote snake_case kwa urahisi.

  3. Vizuri, mapambo ya logger.catch kwa matokeo mazuri na ya kuelimisha ya ufuatiliaji.

PS Usisahau kuongeza tokeni ya alfavantage ndani ya nchi kwenye config.yml, au kuuza nje tofauti ya mazingira. HORTON_SERVICE_APIKEY. Tunapokea ishara hapa.

darasa la CRUD

Tutakuwa na mkusanyiko wa dhamana ili kuhifadhi maelezo ya meta kuhusu dhamana.

hifadhidata/usalama.py

Kwa maoni yangu, hakuna haja ya kuelezea chochote hapa, na darasa la msingi yenyewe ni rahisi sana.

pata_programu()

Wacha tuongeze kitendakazi cha kuunda kitu cha programu programu.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Kwa sasa tutakuwa na uundaji rahisi zaidi wa programu, baadaye kidogo tutaipanua, hata hivyo, ili tusikuweke kusubiri, hapa marejeleo kwa darasa la programu. Ninakushauri pia uangalie darasa la mipangilio, kwa kuwa linawajibika kwa mipangilio mingi.

Mwili kuu

Wakala wa kukusanya na kutunza orodha ya dhamana

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Kwa hivyo, kwanza tunapata kitu cha maombi cha haraka zaidi - ni rahisi sana. Halafu, tunatangaza kwa uwazi mada kwa wakala wetu ... Hapa ni muhimu kutaja ni nini, ni nini parameter ya ndani na jinsi hii inaweza kupangwa tofauti.

  1. Mada katika kafka, ikiwa tunataka kujua ufafanuzi halisi, ni bora kusoma imezimwa. hati, au unaweza kusoma muunganisho juu ya Habre kwa Kirusi, ambapo kila kitu pia kinaonyeshwa kwa usahihi kabisa :)

  2. Kigezo cha ndani, iliyoelezewa vizuri katika hati mbaya, inaturuhusu kusanidi mada moja kwa moja kwenye msimbo, bila shaka, hii inamaanisha vigezo vilivyotolewa na watengenezaji wa kasi, kwa mfano: uhifadhi, sera ya uhifadhi (kwa kufuta chaguo-msingi, lakini unaweza kuweka Compact), idadi ya sehemu kwa kila mada (vipandekufanya, kwa mfano, chini ya umuhimu wa kimataifa maombi haraka).

  3. Kwa ujumla, wakala anaweza kuunda mada inayosimamiwa na maadili ya kimataifa, hata hivyo, napenda kutangaza kila kitu kwa uwazi. Kwa kuongeza, baadhi ya vigezo (kwa mfano, idadi ya partitions au sera ya uhifadhi) ya mada katika tangazo la wakala haiwezi kusanidiwa.

    Hivi ndivyo inavyoweza kuonekana bila kufafanua mada mwenyewe:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Kweli, sasa hebu tueleze kile wakala wetu atafanya :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Kwa hivyo, mwanzoni mwa wakala, tunafungua kipindi cha aiohttp kwa maombi kupitia mteja wetu. Kwa hivyo, wakati wa kuanza mfanyakazi, wakati wakala wetu atazinduliwa, kikao kitafunguliwa mara moja - moja, kwa muda wote mfanyakazi anaendesha (au kadhaa, ikiwa unabadilisha parameter. concurrency kutoka kwa wakala aliye na kitengo chaguo-msingi).

Ifuatayo, tunafuata mkondo (tunaweka ujumbe ndani _, kwa kuwa sisi, katika wakala huyu, hatujali kuhusu maudhui) ya ujumbe kutoka kwa mada yetu, ikiwa zipo kwa kukabiliana na sasa, vinginevyo mzunguko wetu utasubiri kuwasili kwao. Kweli, ndani ya kitanzi chetu, tunaweka risiti ya ujumbe, pata orodha ya kazi (get_securities inarudi tu kwa chaguo-msingi, angalia nambari ya mteja) na uihifadhi kwenye hifadhidata, ukiangalia ikiwa kuna usalama na ticker sawa na. kubadilishana kwenye hifadhidata , ikiwa iko, basi (karatasi) itasasishwa tu.

Wacha tuanzishe uumbaji wetu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Vipengele vya PS sehemu ya wavuti Sitazingatia faust katika makala, kwa hivyo tunaweka bendera inayofaa.

Katika amri yetu ya uzinduzi, tuliiambia faust mahali pa kutafuta kitu cha maombi na nini cha kufanya nacho (zindua mfanyakazi) na kiwango cha pato la logi ya habari. Tunapata pato lifuatalo:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ni hai!!!

Wacha tuangalie seti ya kizigeu. Kama tunavyoona, mada iliundwa na jina ambalo tuliteua katika nambari, nambari chaguo-msingi ya sehemu (8, zilizochukuliwa kutoka. mada_partitions - parameta ya kitu cha maombi), kwani hatukutaja thamani ya mtu binafsi kwa mada yetu (kupitia partitions). Wakala aliyezinduliwa katika mfanyakazi amepewa sehemu zote 8, kwa kuwa ndiye pekee, lakini hii itajadiliwa kwa undani zaidi katika sehemu kuhusu nguzo.

Kweli, sasa tunaweza kwenda kwenye dirisha lingine la terminal na kutuma ujumbe tupu kwa mada yetu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS kutumia @ tunaonyesha kwamba tunatuma ujumbe kwa mada inayoitwa "collect_securities".

Katika kesi hii, ujumbe ulikwenda kwa kizigeu 6 - unaweza kuangalia hii kwa kwenda kafdrop on localhost:9000

Kwenda kwenye dirisha la terminal na mfanyakazi wetu, tutaona ujumbe wa furaha uliotumwa kwa kutumia loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Tunaweza pia kuangalia mongo (kwa kutumia Robo3T au Studio3T) na kuona kuwa dhamana ziko kwenye hifadhidata:

Mimi sio bilionea, na kwa hivyo tunaridhika na chaguo la kwanza la kutazama.

Kazi za usuli kwenye Faust, Sehemu ya II: Mawakala na TimuKazi za usuli kwenye Faust, Sehemu ya II: Mawakala na Timu

Furaha na furaha - wakala wa kwanza yuko tayari :)

Wakala yuko tayari, ishi kwa muda mrefu wakala mpya!

Ndiyo, waheshimiwa, tumefunika tu 1/3 ya njia iliyoandaliwa na makala hii, lakini usivunja moyo, kwa sababu sasa itakuwa rahisi zaidi.

Kwa hivyo sasa tunahitaji wakala anayekusanya taarifa za meta na kuziweka kwenye hati ya mkusanyiko:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Kwa kuwa wakala huyu atachakata taarifa kuhusu usalama mahususi, tunahitaji kuonyesha tiki (alama) ya usalama huu kwenye ujumbe. Kwa lengo hili katika faust kuna Kumbukumbu - madarasa yanayotangaza mpango wa ujumbe katika mada ya wakala.

Katika kesi hii, wacha tuende kumbukumbu.py na ueleze jinsi ujumbe wa mada hii unapaswa kuonekana:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kama unavyoweza kukisia, faust hutumia maelezo ya aina ya python kuelezea schema ya ujumbe, ndiyo sababu toleo la chini linaloungwa mkono na maktaba ni. 3.6.

Wacha turudi kwa wakala, weka aina na uiongeze:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kama unavyoona, tunapitisha kigezo kipya na mpango kwa njia ya uanzishaji wa mada - value_type. Zaidi ya hayo, kila kitu kinafuata mpango huo huo, kwa hivyo sioni hatua yoyote ya kukaa juu ya kitu kingine chochote.

Kweli, mguso wa mwisho ni kuongeza simu kwa wakala wa kukusanya habari za meta kukusanya_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Tunatumia mpango uliotangazwa hapo awali kwa ujumbe. Katika kesi hii, nilitumia njia ya .cast kwani hatuhitaji kungoja matokeo kutoka kwa wakala, lakini inafaa kutaja hilo. njia tuma ujumbe kwa mada:

  1. kutupwa - haizuii kwa sababu haitarajii matokeo. Huwezi kutuma matokeo kwa mada nyingine kama ujumbe.

  2. kutuma - haizuii kwa sababu haitarajii matokeo. Unaweza kutaja wakala katika mada ambayo matokeo yataenda.

  3. kuuliza - kusubiri matokeo. Unaweza kutaja wakala katika mada ambayo matokeo yataenda.

Kwa hivyo, ni hayo tu na mawakala wa leo!

Timu ya ndoto

Jambo la mwisho nililoahidi kuandika katika sehemu hii ni amri. Kama ilivyoelezwa hapo awali, amri katika faust ni wrapper karibu na kubofya. Kwa kweli, faust inashikilia tu amri yetu maalum kwenye kiolesura chake wakati wa kubainisha -A ufunguo

Baada ya mawakala waliotangazwa kuingia mawakala.py ongeza kazi na mpambaji app.amrikuita mbinu kutupwa у kukusanya_dhamana:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Kwa hivyo, ikiwa tunaita orodha ya amri, amri yetu mpya itakuwa ndani yake:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Tunaweza kuitumia kama mtu mwingine yeyote, kwa hivyo wacha tuanze tena mfanyakazi asiye na kasi na tuanze mkusanyiko kamili wa dhamana:

> faust -A horton.agents start-collect-securities

Je, nini kitafuata?

Katika sehemu inayofuata, kwa kutumia mawakala waliosalia kama mfano, tutazingatia utaratibu wa kuzama kwa ajili ya kutafuta viwango vya juu katika bei za kufunga za biashara kwa mwaka na uzinduzi wa cron wa mawakala.

Ni hayo tu kwa leo! Asante kwa kusoma :)

Kanuni ya sehemu hii

Kazi za usuli kwenye Faust, Sehemu ya II: Mawakala na Timu

PS Chini ya sehemu ya mwisho niliulizwa kuhusu kafka faust na confluent (ni sifa gani confluent ina?) Inaonekana kwamba muungano unafanya kazi zaidi kwa njia nyingi, lakini ukweli ni kwamba faust haina usaidizi kamili wa mteja kwa washiriki - hii inafuata kutoka maelezo ya vikwazo vya mteja katika hati.

Chanzo: mapenzi.com

Kuongeza maoni