Operae background in Faustum, Pars II: Agentia et Teams

Operae background in Faustum, Pars II: Agentia et Teams

mensam de contentis in eodem

  1. Pars I: Introductio

  2. Pars II: Agentia et Teams

Quid hic agimus?

Sic sic secunda pars. Ut supra scriptum est, sequentia faciemus.

  1. Scribamus clientem parvum pro alphavantage in aiohttp cum petitionibus ad terminos quibus opus est.

  2. Faciat procuratorem qui notitias de pignoribus et meta de illis notitias colliget.

Sed, hoc est quod in ipso proposito facturi sumus, et in verbis faustis inquisitionis, discemus quomodo scribere procuratores qui processus amnis eventus ex kafka, tum quomodo scribere mandata (click wrapper), in casu nostro - de dis manualibus nuntiis ad argumentum quod agentis vigilantia est.

Training

AlphaVantage Client

Primum, scribemus parvam clientem aiohttp pro petitionibus alphavantage.

alphavantage.py

Corrumpo

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Patet ex hoc omnia;

  1. AlphaVantage API plane simpliciter et pulchre designatum est, ut omnes petitiones per methodum statuere decrevi construct_query ubi rursus est http vocationem.

  2. Agros omnes fero snake_case ad commodum.

  3. Bene, logger.

PS Noli addere signum alphavantagii localiter config.yml addere, vel ambitum variabilem exportare HORTON_SERVICE_APIKEY. Accipimus indicium hic.

CRUDA classis

Praedes collectionem habebimus ad informationes meta de pignoribus reponendas.

database/security.py

Opinor, nihil hic exponere opus est, et ipsum genus turpe admodum simplex.

get_app ()

Addamus munus pro applicatione obiecti in creando app.py

Corrumpo

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Nunc enim simplicissimam applicationem creationis habebimus, paulo post tamen dilatabimus, ne morari vos hic velitis. references ad App-genus. Moneo etiam te ut inspice genus uncinorum, cum maxime occasus sit.

pelagus

Agens pro colligendis et servandis indicem pignoribus

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Prius ergo rem applicationis faustam obtinemus - est plane simplex. Deinde expresse declaramus thema procuratoris... Hic memorare quid sit, quid sit modulus internus, et quomodo aliter disponi possit.

  1. Argumenta in kafka, si accuratam definitionem cognoscere velimus, melius est legere abesse. documentumAut legere potes abstractum de Habré in Russica, ubi omnia satis accurate etiam reflectuntur :)

  2. Parameter internus, satis bene in faust doc descriptus, argumentam directe in codice configurare sinit, hoc sane significat parametris faustis tincidunt, exempli gratia: retentio, retentio, consilium (per default delere, sed potes ponere. foedus) Numerus partitionum per thema (scoresfacere, verbi gratia, minus quam global significationem utilibus faustis).

  3. In genere, agens argumenta tractata cum valoribus globalibus creare potest, tamen omnia explicite declarare placet. Praeterea nonnulli parametri (exempli gratia, numerus partitionum vel retentionis consilium) argumenti in tabula agentis configurari non possunt.

    Ecce quid simile videri posset sine argumento manually definiendo:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Bene, nunc quid procurator noster facturus sit :) describimus.

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Itaque, initio agentis, petitiones per clientem nostrum sessionem quandam aperimus. Sic, cum ab operario incipiendo, agente nostro emisso, sessio statim aperietur - una, totum enim tempus currit operarius (vel plures, si modulum mutes. Concurrency ab agente cum defalta unit).

Deinde rivum sequimur (ponimus nuntium in _quandoquidem nos, in hoc agente, non curamus de argumento nuntiorum ex nostro argumento, si in schedula currente existant, alioquin nostrum cyclum adventum eorum exspectabimus. Bene, intra fasciam nostram, acceptilationem nuntii aperimus, habe indicem activum (get_securitates tantum activas per defaltam redit, vide codicem clientem) securitates et salvam datorum datorum, annotando si securitas cum eisdem tickibus et commutatio datorum, si adsit, tunc (charta) simpliciter renovabitur.

Lorem nostram creaturam!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features web component In articulis faustis non considero, sic aptam vexillum constituimus.

In mandato nostro Lorem, faustum narravimus ubi rem applicationis quaerere et quid cum eo (launch operarium) cum gradu info log output. Sequenti output adepto dabimus tibi:

Corrumpo

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Vivit!!!

Inspice partitionem. Thema, ut videre licet, creatus est nomine, quem in codice designavimus, defectu plurium partitionum (8, e. topic_partitions - applicationis objecti parametri), cum valorem singularem pro argumento nostro (per partitiones) non definivimus. Procurator immissae in opifice omnes 8 partitiones assignatur, cum sit una tantum, sed hoc in parte de pampineis fusius disseretur.

Age, nunc ad aliam fenestram terminalem ire possumus et nuntium inanem nostro argumento mittere;

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usura @ ostendimus nos nuntium mittimus ad thema nomine "collect_securities".

Hoc in casu, nuntius ad partitionem 6 - hoc inspicias eundo ad kafdrop on . localhost:9000

Pergentes ad fenestram terminalem cum adiutore nostro, nuntium laetum missum cum loguru videbimus:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Possumus etiam inspicere in mongo (utendo Robo3T vel Studio3T) et videre cautiones in datorum datorum;

Non sum billionaire, ideoque contenti sumus prima optione intuitu.

Operae background in Faustum, Pars II: Agentia et TeamsOperae background in Faustum, Pars II: Agentia et Teams

Felicitas et gaudium - prima agentis parata est :)

Agens paratus, dum vivat, agente novo!

Ita, indices, iter 1/3 ab hoc articulo confectum tantum habemus, sed non deficies, quia nunc facilius erit.

Nunc ergo opus est procuratore qui notitias meta colligit et in documentum collectione ponit;

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Cum hic agens informationes de certa securitate processerit, notandum est tesseram (symbolum) huius securitatis in nuntio. Ad hoc in fausto sunt Records - classes, quae nuntium declarant in argumento agentis.

In hoc casu eamus records.py et describe quid huius loci nuntium videaris;

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Ut suspicari poteras, faust pythonis genus annotationis utitur ad schema nuntium describendum, quam ob rem minima versio a bibliotheca fulta est. 3.6.

Ad agenti revertamur, pone rationes et adde:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ut videre potes, novum parametrum transimus cum schemate ad methodum initialization thema - value_type. Praeterea, omnia eandem rationem sequuntur, ut punctum in quoquam aliud non video.

Bene, finalis tactus est vocare ad meta informationem collectionis agentis ad collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Utimur antea schematis pro nuncio nuntiato. In hoc casu, methodo .casti usus sum cum exitum ab agente exspectare non oportet, sed memorabile est. via nuntium misit ad rem:

  1. cast - non impedit quod exitum non expectet. Eventum in alium locum mittere non potes sicut nuntium.

  2. mitte - non impedit quod exitum non expectet. Potes procuratorem in rem ad quam exitum perducet.

  3. quaeritur - expectat exitum. Potes procuratorem in rem ad quam exitum perducet.

Ita ut hodie omnia cum agentibus!

Somnium Team

Quod minime in hac parte scripserim mandat. Ut supra dictum est, mandata in fausto sunt fascia circum cliccum. Re quidem vera, faustus simpliciter consuetudini nostrae mandatum interfacienti coniungit cum denotando -A key

Post nuntiatum agentium in agents.py addere munus cum decorator app.commandvocant modum conjectus у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Si ergo dixeris elenchum mandatorum, novum mandatum nostrum in eo erit;

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Ea uti possumus sicut ceteri, sic sileo fautorem adiutorem et plenam pignorum collectionem incipe:

> faust -A horton.agents start-collect-securities

Quid deinde fiet?

In altera parte, utentes exemplo reliquorum agentium, considerabimus mechanismum submersum ad quaerendum extrema in pretia mercaturae clausurae in annum et cron agentibus deducendis.

Id hodie omnia! Gratias legere :)

Code for this part

Operae background in Faustum, Pars II: Agentia et Teams

PS Sub ultima parte interrogatus sum de kafka fausta et confluentia.quid features non confluentes habent?). Videtur quod confluentia pluribus modis magis functionis sit, sed ita res se habet quod faustis plenum clientem subsidium confluentis non habet - hoc sequitur e. descriptiones client angustias in doc.

Source: www.habr.com

Add a comment