Ayyukan bango akan Faust, Sashe na II: Wakilai da Ƙungiyoyi

Ayyukan bango akan Faust, Sashe na II: Wakilai da Ƙungiyoyi

Abubuwan da ke ciki

  1. Sashi na I: Gabatarwa

  2. Sashe na II: Wakilai da Ƙungiyoyi

Me muke yi a nan?

Don haka, kashi na biyu. Kamar yadda aka rubuta a baya, a ciki za mu yi kamar haka:

  1. Bari mu rubuta ƙaramin abokin ciniki don haruffa akan aiohttp tare da buƙatun ƙarshen abubuwan da muke buƙata.

  2. Bari mu ƙirƙiri wakili wanda zai tattara bayanai akan tsaro da bayanan meta akan su.

Amma, wannan shine abin da za mu yi don aikin da kansa, kuma dangane da bincike mai zurfi, za mu koyi yadda ake rubuta wakilai waɗanda ke aiwatar da abubuwan da suka faru daga kafka, da kuma yadda ake rubuta umarni (danna wrapper), a cikin yanayinmu - don saƙonnin turawa da hannu zuwa batun da wakilin ke sa ido.

Horo

Abokin ciniki na AlphaVantage

Da farko, bari mu rubuta ƙaramin abokin ciniki aiohttp don buƙatun haruffa.

alphavantage.py

batawa

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

A haƙiƙa, komai a fili yake daga gare ta:

  1. API ɗin AlphaVantage yana da sauƙi kuma an tsara shi da kyau, don haka na yanke shawarar yin duk buƙatun ta hanyar construct_query inda kuma akwai kiran http.

  2. Ina kawo duk filayen zuwa snake_case don dacewa.

  3. Da kyau, kayan ado na logger.catch don kyakkyawan fitarwa mai ban sha'awa.

PS Kar a manta da ƙara alamar haruffa a cikin gida zuwa config.yml, ko fitarwa canjin yanayi HORTON_SERVICE_APIKEY. Muna karɓar alama a nan.

Babban darajar CRUD

Za mu sami tarin bayanan sirri don adana bayanan meta game da tsaro.

database/security.py

A ganina, babu buƙatar bayyana wani abu a nan, kuma ajin tushe kanta abu ne mai sauƙi.

samun_app()

Bari mu ƙara aiki don ƙirƙirar abun aikace-aikacen ciki app.py

batawa

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

A yanzu za mu sami mafi sauƙin ƙirƙirar aikace-aikacen, nan gaba kadan za mu fadada shi, duk da haka, don kada ku jira, a nan. nassoshi zuwa App-class. Ina kuma ba ku shawarar ku duba ajin saitin, tunda shi ne ke da alhakin yawancin saitunan.

Babban jiki

Wakilin tattarawa da kiyaye jerin abubuwan tsaro

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Don haka, da farko za mu sami abin aikace-aikacen faust - abu ne mai sauƙi. Na gaba, muna bayyana wani batu a fili ga wakilinmu ... Anan yana da daraja ambaton abin da yake, menene ma'aunin ciki da kuma yadda za'a iya shirya wannan daban.

  1. Batutuwa a cikin kafka, idan muna son sanin ainihin ma'anar, yana da kyau mu karanta kashe. daftarin aiki, ko kuma za ku iya karantawa compendium akan Habré a cikin harshen Rashanci, inda komai kuma yake nunawa daidai :)

  2. Siga na ciki, wanda aka bayyana da kyau a cikin faust doc, yana ba mu damar daidaita batun kai tsaye a cikin lambar, ba shakka, wannan yana nufin sigogin da masu haɓaka faust suka bayar, misali: riƙewa, manufofin riƙewa (ta hanyar sharewa ta asali, amma zaku iya saitawa. m), adadin sassan kowane batu (scoresyi, misali, kasa da muhimmancin duniya aikace-aikace masu sauri).

  3. Gabaɗaya, wakili na iya ƙirƙirar batun sarrafawa tare da ƙimar duniya, duk da haka, Ina so in bayyana komai a sarari. Bugu da ƙari, wasu sigogi (misali, adadin ɓangarori ko manufofin riƙewa) na jigo a cikin tallan wakili ba za a iya daidaita su ba.

    Ga yadda zai yi kama ba tare da bayyana batun da hannu ba:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

To, yanzu bari mu bayyana abin da wakilinmu zai yi :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Don haka, a farkon wakilin, muna buɗe taron aiohttp don buƙatun ta abokin cinikinmu. Don haka, lokacin fara ma'aikaci, lokacin da aka ƙaddamar da wakilinmu, za a buɗe wani zama nan da nan - ɗaya, na tsawon lokacin da ma'aikaci ke gudana (ko da yawa, idan kun canza ma'auni). daidaituwa daga wakili tare da tsoho naúrar).

Bayan haka, muna bin rafi (mun sanya saƙon a ciki _, Tun da mu, a cikin wannan wakili, ba mu damu da abubuwan da ke ciki ba) na saƙonni daga batunmu, idan sun kasance a halin yanzu, in ba haka ba sake zagayowar mu zai jira zuwan su. Da kyau, a cikin madaukinmu, muna shigar da karɓar saƙon, mu sami jerin abubuwan aiki (get_securities yana dawo da aiki ta tsohuwa, duba lambar abokin ciniki) kuma mu adana shi cikin ma'ajin bayanai, bincika idan akwai tsaro tare da tikiti iri ɗaya kuma musanya a cikin rumbun adana bayanai , idan akwai, to (takardar) za a sabunta ta kawai.

Bari mu kaddamar da mu halitta!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Abubuwan PS bangaren yanar gizo Ba zan yi la'akari da faust a cikin labaran ba, don haka mun saita tutar da ta dace.

A cikin umarnin ƙaddamar da mu, mun gaya wa faust inda za a nemo abin aikace-aikacen da abin da za a yi da shi (ƙaddamar da ma'aikaci) tare da matakin fitarwar bayanan bayanai. Muna samun fitarwa mai zuwa:

batawa

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Yana raye!!!

Bari mu dubi saitin bangare. Kamar yadda muke iya gani, an ƙirƙiri wani batu tare da sunan da muka zayyana a cikin lambar, tsohuwar adadin ɓangarori (8, an ɗauko daga topic_partitions - sigar abu na aikace-aikacen), tunda ba mu ƙididdige ƙimar mutum ɗaya don batunmu ba (ta hanyar ɓangarori). Wakilin da aka ƙaddamar a cikin ma'aikaci an sanya shi duka sassan 8, tun da shi kaɗai ne, amma za a tattauna wannan dalla-dalla a cikin ɓangaren game da tari.

To, yanzu za mu iya zuwa wata tagar tasha kuma mu aika sako mara kyau zuwa maudu'inmu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS mai amfani @ mun nuna cewa muna aika sako zuwa wani batu mai suna "collect_securities".

A wannan yanayin, saƙon ya tafi partition 6 - za ka iya duba wannan ta zuwa kafdrop on localhost:9000

Tafiya zuwa taga tasha tare da ma'aikacinmu, za mu ga sakon farin ciki da aka aika ta amfani da loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Hakanan zamu iya duba cikin mongo (ta amfani da Robo3T ko Studio3T) kuma mu ga cewa bayanan suna cikin bayanan:

Ni ba biloniya ba ne, don haka mun gamsu da zaɓin kallo na farko.

Ayyukan bango akan Faust, Sashe na II: Wakilai da ƘungiyoyiAyyukan bango akan Faust, Sashe na II: Wakilai da Ƙungiyoyi

Farin ciki da farin ciki - wakili na farko ya shirya :)

Wakili yana shirye, sabon wakili ya daɗe!

Haka ne, ya ‘yan uwa, mun kawo kashi 1/3 ne kawai na hanyar da wannan labarin ya shirya, amma kada ku karaya, domin yanzu za a samu sauki.

Don haka yanzu muna buƙatar wakili wanda ke tattara bayanan meta kuma ya sanya su cikin takaddar tattarawa:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Tun da wannan wakili zai aiwatar da bayanai game da takamaiman tsaro, muna buƙatar nuna alamar (alama) na wannan tsaro a cikin saƙon. Don wannan dalili a faust akwai records - azuzuwan da ke bayyana tsarin saƙo a cikin batun wakili.

A wannan yanayin, bari mu je rubuce-rubuce.py da kuma bayyana yadda sakon wannan batu ya kamata ya kasance:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kamar yadda kuke tsammani, faust yana amfani da nau'in python annotation don bayyana tsarin saƙon, wanda shine dalilin da yasa mafi ƙarancin sigar da ɗakin karatu ke tallafawa shine. 3.6.

Bari mu koma kan wakili, saita nau'ikan kuma mu ƙara:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Kamar yadda kuke gani, mun wuce sabon siga tare da tsari zuwa hanyar ƙaddamar da taken - value_type. Bugu da ari, duk abin da ya bi wannan makirci, don haka ban ga wani ma'ana a zauna a kan wani abu.

Da kyau, taɓawa ta ƙarshe shine ƙara kira zuwa wakilin tattara bayanan meta don tattara_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Muna amfani da tsarin da aka sanar a baya don saƙon. A wannan yanayin, na yi amfani da hanyar .cast tun da ba mu buƙatar jira sakamakon daga wakili, amma yana da daraja ambaton hakan. hanyoyi aika sako zuwa ga batun:

  1. simintin gyare-gyare - baya toshewa saboda baya tsammanin sakamako. Ba za ku iya aika sakamakon zuwa wani batu azaman saƙo ba.

  2. aika - baya toshewa saboda baya tsammanin sakamako. Kuna iya ƙayyade wakili a cikin batun da sakamakon zai tafi.

  3. tambaya - yana jiran sakamako. Kuna iya ƙayyade wakili a cikin batun da sakamakon zai tafi.

Don haka, wannan ke nan tare da wakilai na yau!

Ƙungiyar mafarki

Abu na karshe da na yi alkawarin rubutawa a wannan bangare shi ne umarni. Kamar yadda aka ambata a baya, umarni a cikin faust sune makullin kusa da dannawa. A zahiri, faust kawai yana haɗa umarnin mu na al'ada zuwa ƙirar sa lokacin da aka ƙayyade maɓallin -A

Bayan an sanar da wakilai a wakilai.py ƙara aiki tare da kayan ado app.umarnikiran hanyar jefa у tattara_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Don haka, idan muka kira jerin umarni, sabon umarnin mu zai kasance a ciki:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Za mu iya amfani da shi kamar kowa, don haka bari mu sake kunna ma'aikacin faust kuma mu fara cikakken tarin abubuwan tsaro:

> faust -A horton.agents start-collect-securities

Me zai faru a gaba?

A bangare na gaba, ta yin amfani da sauran wakilai a matsayin misali, za mu yi la'akari da hanyar nutsewa don neman matsananciyar farashin ciniki na shekara da ƙaddamar da cron na wakilai.

Shi ke nan na yau! Godiya da karantawa :)

Code don wannan bangare

Ayyukan bango akan Faust, Sashe na II: Wakilai da Ƙungiyoyi

PS Karkashin bangare na karshe an tambaye ni game da faust and confluent kafka (Wadanne siffofi ne mahaɗar ta ke da su?). Yana da alama cewa haɗuwa ya fi aiki ta hanyoyi da yawa, amma gaskiyar ita ce faust ba shi da cikakken goyon bayan abokin ciniki don haɗuwa - wannan ya biyo baya daga. kwatancin ƙuntatawa abokin ciniki a cikin doc.

source: www.habr.com

Add a comment