CĂšl-fhiosrachadh air Faust, PĂ irt II: Luchd-ionaid agus Sgiobaidhean

CĂšl-fhiosrachadh air Faust, PĂ irt II: Luchd-ionaid agus Sgiobaidhean

ClĂ r-innse

  1. PĂ irt I: Ro-rĂ dh

  2. PĂ irt II: Luchd-ionaid agus Sgiobaidhean

Dè tha sinn a' dèanamh an seo?

Mar sin, mar sin, an dĂ rna pĂ irt. Mar a chaidh a sgrĂŹobhadh roimhe, nĂŹ sinn na leanas ann:

  1. Sgrìobhamaid neach-dèiligidh beag airson alphavantage air aiohttp le iarrtasan airson na puingean crìochnachaidh a dh’ fheumas sinn.

  2. Cruthaichidh sinn àidseant a chruinnicheas dàta mu thèarainteachdan agus meta-fhiosrachadh mun deidhinn.

Ach, is e seo a nì sinn airson a’ phròiseict fhèin, agus a thaobh rannsachadh faust, ionnsaichidh sinn mar a sgrìobhas sinn riochdairean a bhios a ’giullachd tachartasan bho kafka, a bharrachd air mar a sgrìobhas sinn òrdughan (cliog air wrapper), nar cùise - airson teachdaireachdan putaidh làimhe chun chuspair a tha an neach-ionaid a’ cumail sùil.

Ullachadh

Cliant AlphaVantage

An toiseach, sgrÏobhamaid neach-dèiligidh beag aiohttp airson iarrtasan gu alphavantage.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Gu fĂŹrinneach, tha a h-uile dad soilleir bhuaithe:

  1. Tha an AlphaVantage API air a dhealbhadh gu sìmplidh agus gu breagha, agus mar sin chuir mi romham a h-uile iarrtas a dhèanamh tron ​​​​dòigh construct_query far a bheil gairm http an uair sin.

  2. Bheir mi na raointean gu lèir snake_case airson comhfhurtachd.

  3. Uill, an sgeadachadh logger.catch airson toradh traceback brèagha agus fiosrachail.

PS Na dÏochuimhnich cuir ris an tòcan alphavantage gu h-ionadail gu config.yml, no às-mhalairt caochladair na h-àrainneachd HORTON_SERVICE_APIKEY. Gheibh sinn tòcan an seo.

clas CRUD

Bidh cruinneachadh thèarainteachdan againn gus meata-fhiosrachadh a stòradh mu thèarainteachdan.

stòr-dàta/tèarainteachd.py

Na mo bheachd-sa, chan eil feum air dad a mhÏneachadh an seo, agus tha an clas bunaiteach fhèin gu math sÏmplidh.

faigh_app()

Nach cuir sinn gnĂŹomh airson rud tagraidh a chruthachadh a-steach app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Airson a-nis bidh an cruthachadh tagraidh as sìmplidh againn, beagan nas fhaide air adhart leudaichidh sinn e, ge-tà, gus nach cùm thu a ’feitheamh, an seo iomraidhean gu App-class. Tha mi cuideachd a 'toirt comhairle dhut sùil a thoirt air a' chlas roghainnean, oir tha e an urra ris a 'mhòr-chuid de na roghainnean.

Am prĂŹomh phĂ irt

Àidseant airson a 'cruinneachadh agus a' cumail suas liosta de thèarainteachdan

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Mar sin, an toiseach gheibh sinn an stuth tagraidh faust - tha e gu math sìmplidh. An ath rud, bidh sinn gu soilleir a’ cur an cèill cuspair don neach-ionaid againn ... An seo is fhiach iomradh a thoirt air dè a th’ ann, dè am paramadair a-staigh agus mar as urrainnear seo a chuir air dòigh ann an dòigh eadar-dhealaichte.

  1. Cuspairean ann an kafka, ma tha sinn airson faighinn a-mach an dearbh mhĂŹneachadh, tha e nas fheĂ rr a leughadh dheth. sgrĂŹobhainn, no faodaidh tu leughadh eas-chruthach air an ionad ann an Ruisis, far a bheil a h-uile cĂ il cuideachd air a nochdadh gu math ceart :)

  2. Paramadair a-staigh, air a mhìneachadh gu math anns an doc faust, a’ leigeil leinn an cuspair a rèiteachadh gu dìreach anns a ’chòd, gu dearbh, tha seo a’ ciallachadh na paramadairean a thug an luchd-leasachaidh faust seachad, mar eisimpleir: gleidheadh, poileasaidh glèidhidh (cuir às gu bunaiteach, ach faodaidh tu suidheachadh cùmhnant), an àireamh de phàirtean a rèir cuspair (sgòrana dhèanamh, mar eisimpleir, nas lugha na cudromachd cruinneil iarrtasan faust).

  3. San fharsaingeachd, faodaidh an neach-ionaid cuspair stiÚirichte a chruthachadh le luachan cruinneil, ge-tà, is toil leam a h-uile dad fhoillseachadh gu soilleir. A bharrachd air an sin, chan urrainnear cuid de pharamadairean (mar eisimpleir, an àireamh de sgaradh no poileasaidh glèidhidh) den chuspair san t-sanas àidseant a rèiteachadh.

    Seo cò ris a bhiodh e coltach gun a bhith a’ mìneachadh a’ chuspair le làimh:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Uill, a-nis mÏnichidh sinn dè a nÏ an neach-ionaid againn :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Mar sin, aig toiseach an neach-ionaid, bidh sinn a 'fosgladh seisean aiohttp airson iarrtasan tron ​​​​neach-dèiligidh againn. Mar sin, nuair a thòisicheas tu air neach-obrach, nuair a thèid an neach-ionaid againn a chuir air bhog, thèid seisean fhosgladh sa bhad - aon, airson an ùine gu lèir a bhios an neach-obrach a ’ruith (no grunn, ma dh’ atharraicheas tu am paramadair airgead-crìche bho àidseant le aonad bunaiteach).

An uairsin, leanaidh sinn an t-sruth (bidh sinn a 'cur an teachdaireachd a-steach _, leis nach eil sinn, san àidseant seo, a’ gabhail cùram mu shusbaint) teachdaireachdan bhon chuspair againn, ma tha iad ann aig an ìre gnàthach, air neo fuirichidh ar cearcall gus an ruig iad. Uill, taobh a-staigh ar lùb, bidh sinn a’ clàradh cuidhteas na teachdaireachd, a’ faighinn liosta de thèarainteachdan gnìomhach (tilleadh get_securities a-mhàin gnìomhach gu bunaiteach, faic còd teachdaiche) agus sàbhail e don stòr-dàta, a’ sgrùdadh a bheil tèarainteachd ann leis an aon ticker agus iomlaid anns an stòr-dàta , ma tha, bidh e (am pàipear) dìreach air ùrachadh.

Nach cuir sinn air bhog ar cruthachadh!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Feartan PS co-phĂ irt lĂŹn Cha bheachdaich mi air faust anns na h-artaigilean, agus mar sin shuidhich sinn am bratach iomchaidh.

Anns an àithne tòiseachaidh againn, dh’ innis sinn dha faust càite am bu chòir dhut coimhead airson an nì tagraidh agus dè a nì thu leis (cuir air bhog neach-obrach) leis an ìre toraidh log fiosrachaidh. Gheibh sinn an toradh a leanas:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... НОги, НОги, НОги ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Tha e beò !!!

Bheir sinn sùil air an t-seata sgaradh. Mar a chì sinn, chaidh cuspair a chruthachadh leis an ainm a dh'ainmich sinn sa chòd, an àireamh bunaiteach de phàirtean (8, air a thoirt bho cuspair_partitions - paramadair cuspair tagraidh), leis nach do shònraich sinn luach fa leth airson ar cuspair (tro sgaradh). Tha an neach-ionaid a chaidh a chuir air bhog san neach-obrach a’ faighinn a h-uile sgaradh 8, leis gur e seo an aon fhear, ach thèid seo a dheasbad nas mionaidiche anns a ’phàirt mu bhith a’ cruinneachadh.

Uill, a-nis is urrainn dhuinn a dhol gu uinneag crĂŹche eile agus teachdaireachd falamh a chuir chun chuspair againn:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS a 'cleachdadh @ tha sinn a’ sealltainn gu bheil sinn a’ cur teachdaireachd gu cuspair leis an ainm “collect_securities”.

Anns a 'chĂšis seo, chaidh an teachdaireachd gu sgaradh 6 - faodaidh tu seo a dhearbhadh le bhith a' dol gu kafdrop air localhost:9000

A’ dol gu uinneag a’ chrìoch leis an neach-obrach againn, chì sinn teachdaireachd toilichte air a chuir a’ cleachdadh loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Faodaidh sinn cuideachd sgrùdadh a dhèanamh air mongo (a’ cleachdadh Robo3T no Studio3T) agus faicinn gu bheil na tèarainteachdan san stòr-dàta:

Chan e billeanair a th’ annam, agus mar sin tha sinn riaraichte leis a’ chiad roghainn seallaidh.

CĂšl-fhiosrachadh air Faust, PĂ irt II: Luchd-ionaid agus SgiobaidheanCĂšl-fhiosrachadh air Faust, PĂ irt II: Luchd-ionaid agus Sgiobaidhean

Sonas agus toileachas - tha a 'chiad Ă idseant deiseil :)

Agent deiseil, fada beò an neach-ionaid Úr!

Tha, a dhaoine uaisle, cha do chòmhdaich sinn ach 1/3 den t-slighe a chaidh ullachadh leis an artaigil seo, ach na bi air do mhealladh, oir a-nis bidh e nas fhasa.

Mar sin a-nis tha feum againn air Ă idseant a chruinnicheas fiosrachadh meta agus a chuireas ann an sgrĂŹobhainn cruinneachaidh e:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Leis gum bi an neach-ionaid seo a’ pròiseasadh fiosrachadh mu thèarainteachd sònraichte, feumaidh sinn an ticker (samhla) den tèarainteachd seo a chomharrachadh anns an teachdaireachd. Airson an adhbhair seo ann am faust tha clàran - clasaichean a bhios ag ainmeachadh an sgeama teachdaireachd ann an cuspair an àidseant.

Anns a 'chùis seo, rachamaid gu clàran.py agus thoir cunntas air cò ris a bu chòir an teachdaireachd airson a’ chuspair seo a bhith coltach:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Mar is dòcha gu robh thu air smaoineachadh, bidh faust a’ cleachdadh an nota seòrsa python gus cunntas a thoirt air sgeama na teachdaireachd, agus is e sin as coireach gur e an dreach as lugha a tha a’ faighinn taic bhon leabharlann 3.6.

Tillidh sinn chun neach-ionaid, suidhich na seòrsaichean agus cuir ris:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Mar a chì thu, bidh sinn a’ dol seachad air paramadair ùr le sgeama gu modh tòiseachaidh cuspair - value_type. Nas fhaide, tha a h-uile dad a 'leantainn an aon sgeama, agus mar sin chan eil mi a' faicinn feum sam bith a bhith a 'fuireach air rud sam bith eile.

Uill, is e an suathadh mu dheireadh fios a chuir chun neach-ionaid cruinneachadh fiosrachaidh meta gu collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Cleachdaidh sinn an sgeama a chaidh ainmeachadh roimhe airson na teachdaireachd. Anns a 'chÚis seo, chleachd mi an dòigh .cast oir chan fheum sinn feitheamh airson an toradh bhon neach-ionaid, ach is fhiach iomradh a thoirt air sin dòighean cuir teachdaireachd chun chuspair:

  1. tilgeadh - chan eil e a’ bacadh leis nach eil dùil aige ri toradh. Chan urrainn dhut an toradh a chuir gu cuspair eile mar theachdaireachd.

  2. cuir - chan eil e a’ bacadh leis nach eil dùil aige ri toradh. Faodaidh tu àidseant a shònrachadh anns a’ chuspair air an tèid an toradh.

  3. faighnich - feitheamh ri toradh. Faodaidh tu àidseant a shònrachadh anns a’ chuspair air an tèid an toradh.

Mar sin, tha sin uile le riochdairean airson an-diugh!

An sgioba aisling

Is e an rud mu dheireadh a gheall mi a sgrìobhadh sa phàirt seo òrdughan. Mar a chaidh a ràdh na bu thràithe, tha òrdughan ann am faust mar phasgan timcheall air cliog. Gu dearbh, tha faust dìreach a’ ceangal ar n-àithne àbhaisteach ris an eadar-aghaidh aige nuair a tha e a’ sònrachadh an iuchair -A

Às deidh na riochdairean ainmichte a-steach àidseantan.py cuir gnìomh le sgeadadair app.commandgairm an dòigh sgaoileadh у cruinnich_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Mar sin, ma chanas sinn ris an liosta òrdughan, bidh an àithne Úr againn ann:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Faodaidh sinn a chleachdadh mar dhuine sam bith eile, mar sin leig leinn an neach-obrach faust ath-thòiseachadh agus tòiseachadh air cruinneachadh làn de thèarainteachdan:

> faust -A horton.agents start-collect-securities

Dè thachras a-nis?

Anns an ath phàirt, a’ cleachdadh na h-àidseantan a tha air fhàgail mar eisimpleir, beachdaichidh sinn air an inneal sinc airson a bhith a’ lorg fìor chrìochan ann am prìsean dùnaidh malairt airson na bliadhna agus cur air bhog riochdairean cron.

Tha sin uile airson an-diugh! Tapadh leibh airson an leughadh :)

Còd airson am pÏos seo a

CĂšl-fhiosrachadh air Faust, PĂ irt II: Luchd-ionaid agus Sgiobaidhean

PS Fon phàirt mu dheireadh chaidh faighneachd dhomh mu dheidhinn faust agus confluent kafka (dè na feartan a th’ aig confluent?). Tha e coltach gu bheil confluent nas gnìomhaiche ann an iomadh dòigh, ach is e an fhìrinn nach eil làn thaic teachdaiche aig Faust airson confluent - tha seo a’ leantainn bho tuairisgeul air cuingealachaidhean teachdaiche san doc.

Source: www.habr.com

Cuir beachd ann