Tascanna cúlra ar Faust, Cuid II: Gníomhairí agus Foirne

Tascanna cúlra ar Faust, Cuid II: Gníomhairí agus Foirne

Tábla na nÁbhar

  1. Cuid I: Réamhrá

  2. Cuid II: Gníomhairí agus Foirne

Cad atá ar siúl againn anseo?

Mar sin, mar sin, an dara cuid. Mar a scríobhadh níos luaithe, déanfaimid an méid seo a leanas ann:

  1. Scríobhaimis cliant beag le haghaidh alphavantage ar aiohttp le hiarratais ar na críochphointí a theastaíonn uainn.

  2. Cruthaímid gníomhaire a bhaileoidh sonraí ar urrúis agus meiteaisnéis orthu.

Ach, is é seo a dhéanfaimid don tionscadal féin, agus i dtéarmaí taighde faust, foghlaimfimid conas gníomhairí a phróiseálann imeachtaí sruth ó kafka a scríobh, chomh maith le conas orduithe a scríobh (cliceáil fillteán), inár gcás - le haghaidh teachtaireachtaí brú láimhe chuig an ábhar a bhfuil an gníomhaire ag déanamh monatóireachta air.

Oiliúint

AlphaVantage Cliant

Gcéad dul síos, scríobhaimis cliant aiohttp beag le haghaidh iarratais ar alphavantage.

alfavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

I ndáiríre, tá gach rud soiléir uaidh:

  1. Tá an API AlphaVantage deartha go simplí agus go hálainn, agus mar sin chinn mé gach iarratas a dhéanamh tríd an modh construct_query áit a bhfuil glao http ina dhiaidh sin.

  2. Tugaim na páirceanna go léir go snake_case do chompord.

  3. Bhuel, an maisiú logger.catch le haghaidh aschur rian álainn agus faisnéiseach.

PS Ná déan dearmad an comhartha alphavantage a chur leis go háitiúil le config.yml, nó an athróg timpeallachta a onnmhairiú HORTON_SERVICE_APIKEY. Faighimid comhartha anseo.

rang CRUD

Beidh bailiúchán urrús againn chun meiteafhaisnéis a stóráil faoi urrúis.

bunachar sonraí/security.py

I mo thuairimse, ní gá aon rud a mhíniú anseo, agus tá an bunrang féin simplí go leor.

faigh_app()

Cuirimis feidhm leis chun réad feidhmchláir a chruthú i app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Faoi láthair beidh an cruthú iarratais is simplí againn, beagán níos déanaí déanfaimid é a leathnú, áfach, ionas nach mbeidh tú ag fanacht leat, anseo tagairtí chuig App-rang. Molaim duit freisin féachaint ar an rang socruithe, ós rud é go bhfuil sé freagrach as an chuid is mó de na socruithe.

Príomhchuid

Gníomhaire do bhailiú agus do chothabháil liosta urrús

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Mar sin, ar dtús faighimid an réad iarratais faust - tá sé simplí go leor. Ansin, dearbhaímid go sainráite ábhar dár ngníomhaire... Anseo is fiú a lua cad atá ann, cad é an paraiméadar inmheánach agus conas is féidir é seo a shocrú ar bhealach difriúil.

  1. Ábhair i kafka, más mian linn an sainmhíniú cruinn a fháil, is fearr é a léamh as. doiciméad, nó is féidir leat léamh achoimre ar Habré i Rúisis, áit a léirítear gach rud go cruinn freisin :)

  2. Paraiméadar inmheánach, a thuairiscítear go maith sa doiciméad faust, ligeann dúinn an t-ábhar a chumrú go díreach sa chód, ar ndóigh, ciallaíonn sé seo na paraiméadair a sholáthraíonn na forbróirí faust, mar shampla: coinneáil, beartas coinneála (de réir réamhshocraithe scrios, ach is féidir leat a shocrú dhlúth), líon na ndeighiltí in aghaidh an ábhair (scóira dhéanamh, mar shampla, níos lú ná tábhacht dhomhanda iarratais faust).

  3. Go ginearálta, is féidir leis an ngníomhaire ábhar bainistithe a chruthú le luachanna domhanda, áfach, is maith liom gach rud a dhearbhú go sainráite. Ina theannta sin, ní féidir roinnt paraiméadair (mar shampla, líon na ndeighiltí nó beartas coinneála) den ábhar san fhógra gníomhaire a chumrú.

    Seo an chuma a bheadh ​​air gan an topaic a shainiú de láimh:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bhuel, déanaimis cur síos anois ar cad a dhéanfaidh ár ngníomhaire :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Mar sin, ag tús an ghníomhaire, osclaíonn muid seisiún aiohttp d'iarratais tríd ár gcliant. Mar sin, agus oibrí á thosú, nuair a sheolfar ár ngníomhaire, osclófar seisiún láithreach - ceann amháin, ar feadh an ama ar fad a bhíonn an t-oibrí ag rith (nó roinnt, má athraíonn tú an paraiméadar comhthoiliúlacht ó ghníomhaire a bhfuil aonad réamhshocraithe aige).

Ansin, leanaimid an sruth (cuirimid an teachtaireacht isteach _, ós rud é nach bhfuil cúram orainn, sa ghníomhaire seo, faoi ábhar) na dteachtaireachtaí ónár n-ábhar, má tá siad ann ag an bhfritháireamh reatha, ar shlí eile fanfaidh ár dtimthriall chun iad a theacht. Bhuel, taobh istigh dár lúb, déanaimid logáil isteach ar admháil na teachtaireachta, faighimid liosta d’urrúis ghníomhacha (tuairisceáin get_securities gníomhach ach amháin de réir réamhshocraithe, féach ar an gcód cliant) agus sábhálaimid é sa bhunachar sonraí, ag seiceáil an bhfuil slándáil ann leis an ticker céanna agus mhalartú sa bhunachar sonraí , má tá , ansin é ( an páipéar ) a nuashonrú go simplí .

Seolfaimid ár gcruthú!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Gnéithe PS chomhpháirt gréasáin Ní bhreithneoidh mé faust sna hailt, mar sin leagamar an bhratach cuí.

Inár n-ordú seolta, d'inis muid do faust cá háit le cuardach a dhéanamh ar réad an fheidhmchláir agus cad ba cheart a dhéanamh leis (oibrí a sheoladh) leis an leibhéal aschuir loga faisnéise. Faighimid an t-aschur seo a leanas:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Tá sé beo!!!

Breathnaímid ar an tacar críochdheighilte. Mar a fheicimid, cruthaíodh topaic leis an ainm a d'ainmnigh muid sa chód, uimhir réamhshocraithe na ndeighiltí (8, tógtha ó topaic_rannáin - paraiméadar réad iarratais), ós rud é nár shonraigh muid luach aonair dár n-ábhar (trí dheighiltí). Sanntar na 8 ndeighiltí go léir don ghníomhaire seolta san oibrí, ós rud é gurb é an t-aon cheann é, ach déanfar é seo a phlé go mion sa chuid faoi bhraisliú.

Bhuel, anois is féidir linn dul chuig fuinneog teirminéil eile agus teachtaireacht folamh a sheoladh chuig ár n-ábhar:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS ag baint úsáide as @ Léiríonn muid go bhfuil muid ag seoladh teachtaireacht chuig topaic darb ainm “collect_securities”.

Sa chás seo, chuaigh an teachtaireacht go dtí deighilt 6 - is féidir leat é seo a sheiceáil trí dul go dtí kafdrop ar localhost:9000

Ag dul go dtí an fhuinneog teirminéil lenár n-oibrí, feicfimid teachtaireacht sásta seolta ag baint úsáide as loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Is féidir linn breathnú ar mongo freisin (ag baint úsáide as Robo3T nó Studio3T) agus a fheiceáil go bhfuil na hurrúis sa bhunachar sonraí:

Ní billiúnaí mé, agus mar sin táimid sásta leis an gcéad rogha féachana.

Tascanna cúlra ar Faust, Cuid II: Gníomhairí agus FoirneTascanna cúlra ar Faust, Cuid II: Gníomhairí agus Foirne

Sonas agus áthas - tá an chéad ghníomhaire réidh :)

Gníomhaire réidh, fada beo an gníomhaire nua!

Sea, uaisle, ní mór dúinn clúdaithe ach 1/3 den cosán ullmhaithe ag an Airteagal seo, ach ná a dhíspreagadh, mar anois beidh sé níos éasca.

Mar sin anois tá gníomhaire ag teastáil uainn a bhailíonn meiteaisnéis agus a chuireann isteach i ndoiciméad bailiúcháin é:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ós rud é go bpróiseálfaidh an gníomhaire seo faisnéis faoi shlándáil ar leith, ní mór dúinn ticeoir (siombail) na slándála seo a chur in iúl sa teachtaireacht. Chun na críche seo i faust tá Taifid — aicmí a dhearbhaíonn an scéim teachtaireachta san ábhar gníomhaire.

Sa chás seo, a ligean ar dul go dtí taifid.py agus déan cur síos ar an chuma ar cheart a bheith ar an teachtaireacht don ábhar seo:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Mar a thug tú faoi deara, úsáideann faust anótáil de chineál python chun cur síos a dhéanamh ar scéimre na teachtaireachta, agus sin an fáth a bhfuil an leagan íosta a dtacaíonn an leabharlann leis 3.6.

Fillfimid ar an ngníomhaire, socróimid na cineálacha agus cuirfimid leis:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Mar a fheiceann tú, cuirimid paraiméadar nua le scéim ar aghaidh chuig an modh túsaithe topaicí - value_type. Thairis sin, leanann gach rud an scéim chéanna, mar sin ní fheicim aon phointe cónaithe ar aon rud eile.

Bhuel, is é an teagmháil dheireanach ná glao a chur leis an ngníomhaire bailithe faisnéise meitea chun collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Bainimid úsáid as an scéim a fógraíodh roimhe seo don teachtaireacht. Sa chás seo, d'úsáid mé an modh .cast ós rud é nach gá dúinn fanacht leis an toradh ón ngníomhaire, ach is fiú a lua bealaí seol teachtaireacht chuig an topaic:

  1. cast - ní chuireann sé bac ar toisc nach bhfuil sé ag súil le toradh. Ní féidir leat an toradh a sheoladh chuig ábhar eile mar theachtaireacht.

  2. seol - ní bhacann sé toisc nach bhfuil sé ag súil le toradh. Is féidir leat gníomhaire a shonrú san ábhar a rachaidh an toradh chuige.

  3. fiafraigh - fanann sé le toradh. Is féidir leat gníomhaire a shonrú san ábhar a rachaidh an toradh chuige.

Mar sin, sin é go léir le gníomhairí don lá atá inniu ann!

An fhoireann aisling

Is é an rud deireanach a gheall mé a scríobh sa chuid seo ná orduithe. Mar a luadh níos luaithe, tá orduithe i faust fillteán timpeall cliceáil. Go deimhin, ní dhéanann Faust ach ár n-ordú saincheaptha a cheangal lena chomhéadan agus an eochair -A á shonrú

Tar éis na gníomhairí fógartha i gníomhairí.py cuir feidhm le maisitheoir app.ordúag glaoch ar an modh caitheadh у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Mar sin, má thugaimid an liosta orduithe, beidh ár n-ordú nua ann:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Is féidir linn é a úsáid mar aon duine eile, mar sin déanaimis an t-oibrí faust a atosú agus tús a chur le bailiúchán iomlán urrús:

> faust -A horton.agents start-collect-securities

Cad a tharlóidh ina dhiaidh sin?

Sa chéad chuid eile, ag baint úsáide as na gníomhairí atá fágtha mar shampla, déanfaimid machnamh ar an meicníocht doirteal chun cuardach a dhéanamh ar dhálaí foircneacha i bpraghsanna deiridh trádála don bhliain agus seoladh cron gníomhairí.

Sin uile don lá inniu! Go raibh maith agat as léamh :)

Cód le haghaidh an chuid seo

Tascanna cúlra ar Faust, Cuid II: Gníomhairí agus Foirne

PS Faoin gcuid dheireanach cuireadh ceist orm faoi faust agus confluent kafka (cad iad na gnéithe atá ag comhluadar?). Dealraíonn sé go bhfuil confluent níos feidhmiúla ar go leor bealaí, ach is é fírinne an scéil nach bhfuil tacaíocht iomlán cliant ag faust le haghaidh confluent - leanann sé seo ó cur síos ar shrianta cliant sa doiciméad.

Foinse: will.com

Add a comment