Nga mahi papamuri mo Faust, Wāhanga II: Nga Kaihoko me nga Kapa

Nga mahi papamuri mo Faust, Wāhanga II: Nga Kaihoko me nga Kapa

Ripanga o nga ihirangi

  1. Wāhanga I: Kupu Whakataki

  2. Wāhanga II: Nga Kaihoko me nga Kapa

Kei te aha tatou i konei?

Na, na, te wahanga tuarua. Ka rite ki te tuhi i mua, ka mahia e matou nga mea e whai ake nei:

  1. Me tuhi he kiritaki iti mo te alphavantage i runga i te aiohttp me nga tono mo nga pito e hiahiatia ana.

  2. Me hanga he kaihoko hei kohi raraunga mo nga peeke me nga korero meta mo ratou.

Engari, koinei te mea ka mahia e matou mo te kaupapa ake, a, mo te rangahau tere, ka ako matou me pehea te tuhi i nga kaitohutohu e whakahaere ana i nga kaupapa rere mai i te kafka, me pehea hoki te tuhi i nga whakahau (paatohia te takai), i roto i ta maatau keehi - mo nga karere pana a-ringa ki te kaupapa e aroturukitia ana e te kaihoko.

Whakangungu

Kiritaki AlphaVantage

Tuatahi, tuhia he kiritaki aiohttp iti mo nga tono ki te alphavantage.

alphavantage.py

Kaipahua

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Inaa, he maamaa nga mea katoa mai i tera:

  1. Ko te AlphaVantage API he tino ngawari, he ataahua hoki te hoahoa, no reira ka whakatau ahau ki te tuku tono katoa ma te tikanga construct_query kei hea he karanga http.

  2. Ka kawea e ahau nga mara katoa ki snake_case mo te waatea.

  3. Ana, ko te logger.catch whakapaipai mo te putanga traceback ataahua me te korero.

PS Kaua e wareware ki te taapiri i te tohu alphavantage ki te rohe ki te config.yml, ki te kaweake ranei i te taurangi taiao HORTON_SERVICE_APIKEY. Ka whiwhi tatou i tetahi tohu konei.

akomanga CRUD

Ka whiwhi matou i te kohinga peeke hei penapena korero meta mo nga peeke.

pātengi raraunga/security.py

Ki taku whakaaro, kaore he take ki te whakamaarama i tetahi mea i konei, a ko te karaehe turanga he tino ngawari.

tiki_taupānga()

Me taapiri he mahi hei hanga i tetahi ahanoa tono ki roto taupānga.py

Kaipahua

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Inaianei kei a maatau te hanga tono ngawari, he iti nei ka whakawhānuihia e matou, engari, kia kore ai koe e tatari, i konei tohutoro ki App-akomanga. Ka tūtohu ano ahau ki te titiro ki te akomanga tautuhinga, na te mea kei a ia te kawenga mo te nuinga o nga tautuhinga.

Ko te waahanga matua

He kaihoko mo te kohikohi me te pupuri i te rarangi o nga peeke

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Na, tuatahi ka whiwhi tatou i te ahanoa tono faust - he tino ngawari. I muri mai, ka tino whakapuakihia e matou he kaupapa mo to maatau kaihoko ... I konei he mea tika kia whakahuahia he aha te mea, he aha te tawhā o roto me te pehea e taea ai te whakarite rereke.

  1. Ko nga kaupapa i roto i te kafka, ki te hiahia tatou ki te mohio ki te whakamaramatanga tika, he pai ake te panui atu. tuhinga, ka taea ranei e koe te panui whakarapopototanga i runga i te Habré i te reo Ruhia, kei reira nga mea katoa e whakaatu tika ana :)

  2. Tawhā o roto, he pai te korero i roto i te tuhinga faust, ka taea e maatau te whirihora tika i te kaupapa i roto i te waehere, o te tikanga, ko te tikanga ko nga taapiri e whakaratohia ana e nga kaiwhakawhanake faust, hei tauira: te pupuri, te kaupapa here pupuri (ma te whakakore taunoa, engari ka taea e koe te whakarite kiato), te maha o nga wehewehenga mo ia kaupapa (kauteki te mahi, hei tauira, iti iho i te hiranga o te ao tono faust).

  3. I te nuinga o te waa, ka taea e te kaihoko te hanga kaupapa whakahaere me nga uara o te ao, heoi, he pai ki ahau te whakapuaki i nga mea katoa. I tua atu, ko etahi tawhā (hei tauira, te maha o nga wehewehenga, kaupapa here pupuri ranei) o te kaupapa i roto i te panui kaihoko kaore e taea te whirihora.

    Anei te ahua me te kore e tautuhi-a-ringa i te kaupapa:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ana, inaianei ka whakaahuahia nga mahi a to maatau kaihoko :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Na, i te timatanga o te kaihoko, ka whakatuwherahia e matou he huihuinga aiohttp mo nga tono ma o taatau kaihoko. No reira, ka timata te kaimahi, ka whakarewahia to maatau kaihoko, ka whakatuwherahia he huihuinga - kotahi, mo te wa katoa e rere ana te kaimahi (he maha ranei, mena ka huri koe i te tawhā. whakaahuru mai i tetahi kaihoko whai waeine taunoa).

I muri mai, ka whai i te awa (ka tuu te karere ki roto _, i te mea ko matou, i roto i tenei kaihoko, kaore e aro ki nga korero) o nga karere mai i ta maatau kaupapa, mena kei te noho i te waa o naianei, mena ka tatari taatau huringa mo to ratou taenga mai. Ana, i roto i ta maatau kopae, ka takiuru matou i te rihīti o te karere, ka tiki i te raarangi o nga mahi (ka hoki mai a get_securities ma te taunoa anake, tirohia te waehere kiritaki) ka penapena ki te paataka korero, ka tirohia mena he haumarutanga me te tohu kotahi ano. whakawhiti i roto i te pātengi raraunga , ki te mea kei reira, ka (te pepa) ka whakahou noa.

Kia whakarewahia ta tatou hangahanga!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Nga waahanga PS wae tukutuku Kaore au e whakaaro ki nga korero i roto i nga tuhinga, na reira ka whakatauhia e matou te haki tika.

I roto i ta maatau whakahau whakarewatanga, i korero matou ki a faust ki hea e rapu ai mo te ahanoa tono me te aha me mahi ki taua mea (whakarewahia he kaimahi) me te taumata whakaputanga korero. Ka whiwhi tatou i te putanga e whai ake nei:

Kaipahua

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Kei te ora!!!

Kia titiro tatou ki te huinga wehewehenga. Ka kite tatou, i hangaia he kaupapa me te ingoa i tohua e matou i roto i te waehere, te nama taunoa o nga wehewehenga (8, i tangohia mai i kaupapa_wehenga - tawhā ahanoa tono), na te mea kaore matou i tohu he uara takitahi mo ta maatau kaupapa (ma nga wehewehenga). Ko te kaihoko i whakarewahia i roto i te kaimahi ka whakawhiwhia ki nga waahanga 8 katoa, na te mea ko ia anake, engari ka korerohia tenei mo nga korero mo te roopu.

Ana, inaianei ka taea e taatau te haere ki tetahi atu matapihi kapeka ka tukuna he panui kau ki ta maatau kaupapa:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS te whakamahi @ e whakaatu ana matou kei te tuku karere ki tetahi kaupapa ko "collect_securities".

I tenei take, i haere te karere ki te wehewehenga 6 - ka taea e koe te tirotiro i tenei ma te haere ki te kafdrop on localhost:9000

Ma te haere ki te matapihi mutunga me ta maatau kaimahi, ka kite tatou i tetahi panui koa i tukuna ma te whakamahi i te loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Ka taea hoki e tatou te titiro ki te mongo (ma te whakamahi i te Robo3T, i te Studio3T ranei) ka kite kei roto i te paataka raraunga nga peeke.

Ehara ahau i te piriona, na reira kei te pai matou ki te whiringa maataki tuatahi.

Nga mahi papamuri mo Faust, Wāhanga II: Nga Kaihoko me nga KapaNga mahi papamuri mo Faust, Wāhanga II: Nga Kaihoko me nga Kapa

Te harikoa me te koa - kua rite te kaihoko tuatahi :)

Kua rite te kaihoko, kia ora te kaihoko hou!

Ae, e koro, he 1/3 noa iho o te huarahi kua whakaritea e tenei tuhinga, engari kaua e ngakaukore, na te mea ka mama ake inaianei.

Inaianei kei te hiahia matou ki tetahi kaihoko hei kohi korero meta ka maka ki roto i te tuhinga kohinga:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

I te mea ka tukatukahia e tenei kaihoko nga korero mo tetahi haumarutanga motuhake, me tohu e matou te tohu (tohu) o tenei haumarutanga i roto i te karere. Mo tenei kaupapa i roto i te faust kei reira Records — nga karaehe e whakaatu ana i te kaupapa korero i roto i te kaupapa kaihoko.

I roto i tenei take, kia haere tatou ki records.py me te whakaahua he aha te ahua o te karere mo tenei kaupapa:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

I whakaaro pea koe, ka whakamahi a faust i te momo kupu python hei whakaahua i te aronuinga karere, na reira ko te putanga iti e tautokohia ana e te whare pukapuka. 3.6.

Me hoki ki te kaihoko, tautuhi nga momo me te taapiri:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ka kite koe, ka tukuna he tawhā hou me tetahi kaupapa ki te tikanga arawhiti kaupapa - value_type. I tua atu, ko nga mea katoa e whai ana i te kaupapa kotahi, no reira kaore au e kite i tetahi take ki te noho ki tetahi atu mea.

Ana, ko te pa whakamutunga ko te taapiri waea ki te kaihoko kohinga korero meta ki te kohikohi_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Ka whakamahia e matou te kaupapa kua panuitia i mua mo te karere. I tenei take, i whakamahia e ahau te tikanga .cast mai i te mea kaore e tika kia tatari mo te hua mai i te kaihoko, engari he pai te whakahua huarahi tuku karere ki te kaupapa:

  1. maka - e kore e aukati na te mea kaore he hua ka puta. Kaore e taea e koe te tuku i te hua ki tetahi atu kaupapa hei karere.

  2. tuku - kaore e aukati na te mea kaore e tumanakohia he hua. Ka taea e koe te tohu i tetahi kaihoko i roto i te kaupapa ka puta te hua.

  3. patai - ka tatari mo te hua. Ka taea e koe te tohu i tetahi kaihoko i roto i te kaupapa ka puta te hua.

No reira, ko nga kaihoko mo tenei ra!

Te Kapa moemoea

Ko te mea whakamutunga i oati ahau ki te tuhi i tenei wahanga ko nga whakahau. Ka rite ki te korero i mua ake nei, ko nga whakahau i roto i te fau he takai huri noa i te paatene. Inaa, ka piri noa a faust i ta maatau whakahau ritenga ki tana atanga ina tohu ana i te ki -A

I muri i nga korero a nga kaitohutohu i roto agents.py tāpirihia he mahi me te whakapaipai taupānga.whakahaute karanga i te tikanga maka у kohikohi_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

No reira, ki te karangahia e matou te rarangi o nga whakahau, ko ta matou whakahau hou kei roto:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ka taea e taatau te whakamahi pera me etahi atu, na me timata ano te kaimahi faust ka timata i te kohinga katoa o nga haumaru:

> faust -A horton.agents start-collect-securities

Ka aha a muri ake nei?

I te waahanga e whai ake nei, ma te whakamahi i nga toenga o nga kaihoko hei tauira, ka whakaarohia e matou te tikanga totohu mo te rapu i nga mea tino nui i roto i nga utu kati o te hokohoko mo te tau me te whakarewatanga cron o nga kaihoko.

Heoi ano mo tenei ra! Nga mihi mo te panui :)

Waehere mo tenei waahanga

Nga mahi papamuri mo Faust, Wāhanga II: Nga Kaihoko me nga Kapa

PS I raro i te wahanga whakamutunga i pataia ahau mo te faust and confluent kafka (he aha nga ahuatanga o te whakakotahitanga?). Te ahua nei he maha nga mahi a te confluent, engari ko te meka ko te kore a faust i te tautoko katoa o te kiritaki mo te confluent - e whai ake nei mai i whakaahuatanga o nga here a te kiritaki i roto i te tuhinga.

Source: will.com

Tāpiri i te kōrero