Tasgau cefndir ar Faust, Rhan II: Asiantau a Thimau

Tasgau cefndir ar Faust, Rhan II: Asiantau a Thimau

Tabl cynnwys

  1. Rhan I: Cyflwyniad

  2. Rhan II: Asiantau a Thimau

Beth ydym ni'n ei wneud yma?

Felly, felly, yr ail ran. Fel yr ysgrifennwyd yn gynharach, ynddo byddwn yn gwneud y canlynol:

  1. Gadewch i ni ysgrifennu cleient bach ar gyfer alphavantage ar aiohttp gyda cheisiadau am y pwyntiau terfyn sydd eu hangen arnom.

  2. Gadewch i ni greu asiant a fydd yn casglu data ar warantau a gwybodaeth meta arnynt.

Ond, dyma beth fyddwn ni'n ei wneud ar gyfer y prosiect ei hun, ac o ran ymchwil faust, byddwn yn dysgu sut i ysgrifennu asiantau sy'n prosesu digwyddiadau ffrydio o kafka, yn ogystal â sut i ysgrifennu gorchmynion (cliciwch amlapiwr), yn ein hachos ni - ar gyfer negeseuon gwthio â llaw i'r pwnc y mae'r asiant yn ei fonitro.

Hyfforddiant

Cleient AlphaVantage

Yn gyntaf, gadewch i ni ysgrifennu cleient aiohttp bach ar gyfer ceisiadau i alphavantage.

alffantiaeth.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Mewn gwirionedd, mae popeth yn glir ohono:

  1. Mae API AlphaVantage wedi'i ddylunio'n eithaf syml a hyfryd, felly penderfynais wneud pob cais trwy'r dull construct_query lle yn ei dro mae galwad http.

  2. Rwy'n dod â'r holl feysydd i snake_case er hwylustod.

  3. Wel, mae'r addurn logger.catch ar gyfer allbwn olrhain hardd ac addysgiadol.

PS Peidiwch ag anghofio ychwanegu'r tocyn alphavantage yn lleol i config.yml, neu allforio newidyn yr amgylchedd HORTON_SERVICE_APIKEY. Rydym yn derbyn tocyn yma.

dosbarth CRUD

Bydd gennym gasgliad gwarantau i storio gwybodaeth meta am warantau.

cronfa ddata/diogelwch.py

Yn fy marn i, nid oes angen esbonio unrhyw beth yma, ac mae'r dosbarth sylfaen ei hun yn eithaf syml.

cael_app()

Gadewch i ni ychwanegu swyddogaeth ar gyfer creu gwrthrych cais i mewn app.py.

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Am y tro bydd gennym y creu cais symlaf, ychydig yn ddiweddarach byddwn yn ei ehangu, fodd bynnag, er mwyn peidio â'ch cadw'n aros, yma cyfeiriadau i App-dosbarth. Rwyf hefyd yn argymell edrych ar y dosbarth gosodiadau, gan ei fod yn gyfrifol am y rhan fwyaf o'r gosodiadau.

Prif gorff

Asiant ar gyfer casglu a chynnal rhestr o warantau

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Felly, yn gyntaf rydyn ni'n cael gwrthrych y cais faust - mae'n eithaf syml. Nesaf, rydym yn datgan yn benodol bwnc i'n hasiant... Yma mae'n werth sôn am beth ydyw, beth yw'r paramedr mewnol a sut y gellir trefnu hyn yn wahanol.

  1. Pynciau yn kafka, os ydym am wybod yr union ddiffiniad, mae'n well darllen i ffwrdd. dogfen, neu gallwch ddarllen compendiwm ar Habré yn Rwsieg, lle mae popeth hefyd yn cael ei adlewyrchu'n eithaf cywir :)

  2. Paramedr mewnol, a ddisgrifir yn eithaf da yn y doc faust, yn ein galluogi i ffurfweddu'r pwnc yn uniongyrchol yn y cod, wrth gwrs, mae hyn yn golygu'r paramedrau a ddarperir gan y datblygwyr faust, er enghraifft: cadw, polisi cadw (yn ddiofyn dileu, ond gallwch chi osod compact), nifer y rhaniadau fesul pwnc (rhaniadaui wneud, er enghraifft, llai na arwyddocâd byd-eang ceisiadau faust).

  3. Yn gyffredinol, gall yr asiant greu pwnc a reolir gyda gwerthoedd byd-eang, fodd bynnag, hoffwn ddatgan popeth yn benodol. Yn ogystal, ni ellir ffurfweddu rhai paramedrau (er enghraifft, nifer y rhaniadau neu bolisi cadw) y pwnc yn yr hysbyseb asiant.

    Dyma sut y gallai edrych heb ddiffinio'r pwnc â llaw:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Wel, nawr gadewch i ni ddisgrifio beth fydd ein hasiant yn ei wneud :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Felly, ar ddechrau'r asiant, rydym yn agor sesiwn aiohttp ar gyfer ceisiadau trwy ein cleient. Felly, wrth ddechrau gweithiwr, pan fydd ein hasiant yn cael ei lansio, bydd sesiwn yn cael ei hagor ar unwaith - un, am yr amser cyfan mae'r gweithiwr yn rhedeg (neu sawl, os byddwch chi'n newid y paramedr arian cyfred gan asiant ag uned ddiofyn).

Nesaf, rydym yn dilyn y ffrwd (rydym yn gosod y neges i mewn _, gan nad ydym ni, yn yr asiant hwn, yn poeni am gynnwys) negeseuon o'n pwnc, os ydynt yn bodoli ar y gwrthbwyso cyfredol, fel arall bydd ein cylch yn aros iddynt gyrraedd. Wel, y tu mewn i'n dolen, rydym yn cofnodi derbyn y neges, yn cael rhestr o warantau gweithredol (mae get_securities yn dychwelyd yn weithredol yn ddiofyn, gweler y cod cleient) a'i gadw i'r gronfa ddata, gan wirio a oes diogelwch gyda'r un ticiwr a cyfnewid yn y gronfa ddata , os oes , yna mae'n (y papur ) yn syml yn cael ei diweddaru .

Gadewch i ni lansio ein creadigaeth!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Nodweddion PS cydran gwe Ni fyddaf yn ystyried faust yn yr erthyglau, felly rydym yn gosod y faner briodol.

Yn ein gorchymyn lansio, dywedasom wrth faust ble i chwilio am wrthrych y cais a beth i'w wneud ag ef (lansio gweithiwr) gyda lefel allbwn y log gwybodaeth. Rydym yn cael yr allbwn canlynol:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Mae'n fyw!!!

Gadewch i ni edrych ar y set rhaniad. Fel y gallwn weld, crëwyd pwnc gyda'r enw a ddynodwyd gennym yn y cod, sef y nifer rhagosodedig o raniadau (8, a gymerwyd o rhaniadau_topig - paramedr gwrthrych cais), gan na wnaethom nodi gwerth unigol ar gyfer ein pwnc (trwy raniadau). Neilltuir pob un o'r 8 rhaniad i'r asiant a lansiwyd yn y gweithiwr, gan mai dyma'r unig un, ond bydd hyn yn cael ei drafod yn fanylach yn y rhan am glystyru.

Wel, nawr gallwn fynd i ffenestr derfynell arall ac anfon neges wag i'n pwnc:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS yn defnyddio @ rydym yn dangos ein bod yn anfon neges at bwnc o'r enw “collect_securities”.

Yn yr achos hwn, aeth y neges i raniad 6 - gallwch wirio hyn trwy fynd i kafdrop on localhost:9000

Wrth fynd i ffenestr y derfynell gyda'n gweithiwr, byddwn yn gweld neges hapus yn cael ei hanfon gan ddefnyddio loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Gallwn hefyd edrych i mewn i mongo (gan ddefnyddio Robo3T neu Studio3T) a gweld bod y gwarantau yn y gronfa ddata:

Nid wyf yn biliwnydd, ac felly rydym yn fodlon ar yr opsiwn gwylio cyntaf.

Tasgau cefndir ar Faust, Rhan II: Asiantau a ThimauTasgau cefndir ar Faust, Rhan II: Asiantau a Thimau

Hapusrwydd a llawenydd - mae'r asiant cyntaf yn barod :)

Asiant yn barod, hir oes yr asiant newydd!

Ydym, foneddigion, dim ond 1/3 o'r llwybr a baratowyd gan yr erthygl hon yr ydym wedi'i orchuddio, ond peidiwch â digalonni, oherwydd nawr bydd yn haws.

Felly nawr mae angen asiant arnom sy'n casglu gwybodaeth feta a'i roi mewn dogfen gasglu:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Gan y bydd yr asiant hwn yn prosesu gwybodaeth am ddiogelwch penodol, mae angen i ni nodi ticiwr (symbol) y diogelwch hwn yn y neges. At y diben hwn yn faust y mae Cofnodion — dosbarthiadau sy'n datgan y cynllun neges yn y pwnc asiant.

Yn yr achos hwn, gadewch i ni fynd i cofnodion.py a disgrifiwch sut dylai'r neges ar gyfer y pwnc hwn edrych:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Fel y gallech fod wedi dyfalu, mae faust yn defnyddio'r anodiad math python i ddisgrifio'r sgema neges, a dyna pam mai'r fersiwn lleiaf a gefnogir gan y llyfrgell yw 3.6.

Gadewch i ni ddychwelyd at yr asiant, gosodwch y mathau a'u hychwanegu:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Fel y gwelwch, rydym yn pasio paramedr newydd gyda chynllun i'r dull cychwyn pwnc - value_type. Ymhellach, mae popeth yn dilyn yr un cynllun, felly nid wyf yn gweld unrhyw bwynt mewn annedd ar unrhyw beth arall.

Wel, y cyffyrddiad olaf yw ychwanegu galwad at yr asiant casglu gwybodaeth meta i collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Rydym yn defnyddio'r cynllun a gyhoeddwyd yn flaenorol ar gyfer y neges. Yn yr achos hwn, defnyddiais y dull .cast gan nad oes angen i ni aros am y canlyniad gan yr asiant, ond mae'n werth nodi hynny ffyrdd anfon neges at y pwnc:

  1. cast - nid yw'n rhwystro oherwydd nad yw'n disgwyl canlyniad. Ni allwch anfon y canlyniad i bwnc arall fel neges.

  2. anfon - nid yw'n rhwystro oherwydd nid yw'n disgwyl canlyniad. Gallwch chi nodi asiant yn y pwnc y bydd y canlyniad yn mynd iddo.

  3. gofyn - aros am ganlyniad. Gallwch chi nodi asiant yn y pwnc y bydd y canlyniad yn mynd iddo.

Felly, dyna i gyd gydag asiantau ar gyfer heddiw!

Tîm y freuddwyd

Y peth diweddaf a addewais i ysgrifenu yn y rhan hon ydyw gorchymynion. Fel y soniwyd yn gynharach, mae gorchmynion yn faust yn lapio o gwmpas clic. Mewn gwirionedd, mae faust yn atodi ein gorchymyn arferol i'w ryngwyneb wrth nodi'r allwedd -A

Ar ôl yr asiantau cyhoeddedig yn asiantau.py ychwanegu swyddogaeth gydag addurnwr ap.commandgalw y dull bwrw у casglu_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Felly, os byddwn yn galw'r rhestr o orchmynion, bydd ein gorchymyn newydd ynddo:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Gallwn ei ddefnyddio fel unrhyw un arall, felly gadewch i ni ailgychwyn y gweithiwr faust a dechrau casgliad llawn o warantau:

> faust -A horton.agents start-collect-securities

Beth fydd yn digwydd nesaf?

Yn y rhan nesaf, gan ddefnyddio'r asiantau sy'n weddill fel enghraifft, byddwn yn ystyried y mecanwaith sinc ar gyfer chwilio am eithafion yn y prisiau cau masnachu am y flwyddyn a lansiad cron asiantau.

Dyna i gyd am heddiw! Diolch am ddarllen :)

Cod ar gyfer y rhan hon

Tasgau cefndir ar Faust, Rhan II: Asiantau a Thimau

ON O dan y rhan olaf holwyd fi am faust a confluent kafka (pa nodweddion sydd gan gydlifiad?). Mae'n ymddangos bod cydlifiad yn fwy ymarferol mewn sawl ffordd, ond y ffaith yw nad oes gan Faust gefnogaeth cleient lawn ar gyfer cydlifiad - mae hyn yn dilyn o disgrifiadau o gyfyngiadau cleient yn y ddogfen.

Ffynhonnell: hab.com

Ychwanegu sylw