Ọrụ ndabere na Faust, Nkebi II: Ndị nnọchi anya na otu

Ọrụ ndabere na Faust, Nkebi II: Ndị nnọchi anya na otu

Isiokwu dị n’ime

  1. Akụkụ nke Mbụ: Okwu Mmalite

  2. Akụkụ II: Ndị nnọchi anya na otu

Kedu ihe anyị na-eme ebe a?

Yabụ, yabụ, akụkụ nke abụọ. Dịka e dere na mbụ, n'ime ya anyị ga-eme ihe ndị a:

  1. Ka anyị dee obere onye ahịa maka mkpụrụedemede na aiohttp yana arịrịọ maka njedebe njedebe anyị chọrọ.

  2. Ka anyị mepụta onye nnọchi anya ga-anakọta data na nchekwa na ozi meta na ha.

Ma, nke a bụ ihe anyị ga-eme maka ọrụ ahụ n'onwe ya, na n'ihe gbasara nyocha nke na-adịghị mma, anyị ga-amụta otú e si ede ndị ọrụ na-edozi ihe omume iyi site na kafka, yana otu esi ede iwu (pịa wrapper), n'ọnọdụ anyị - maka ozi ịkwanye aka na isiokwu nke onye ọrụ na-enyocha.

Ọzụzụ

Onye ahịa AlphaVantage

Mbụ, ka anyị dee obere onye ahịa aiohttp maka arịrịọ maka mkpụrụedemede.

alfavantage.py

nāpunara

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

N'ezie, ihe niile doro anya na ya:

  1. AlphaVantage API bụ nke dị mfe ma maa mma, yabụ ekpebiri m ịrịọ arịrịọ niile site na usoro a construct_query ebe ọzọ enwere oku http.

  2. Ana m ebute ubi niile snake_case maka nkasi obi.

  3. Ọfọn, ihe ịchọ mma logger.catch maka mmepụta traceback mara mma na nke na-enye nkọwa.

PS Echefula itinye akara mkpụrụedemede na mpaghara ka config.yml, ma ọ bụ mbupụ mgbanwe gburugburu ebe obibi HORTON_SERVICE_APIKEY. Anyị na-enweta akara ngosi ebe a.

Klas CRUD

Anyị ga-enwe mkpokọta nchekwa iji chekwaa ozi meta gbasara nchekwa.

nchekwa data/security.py

N'uche nke m, ọ dịghị mkpa ịkọwa ihe ọ bụla ebe a, na klas isi n'onwe ya dị nnọọ mfe.

nweta_app()

Ka anyị tinye ọrụ maka ịmepụta ihe ngwa n'ime ngwa.py

nāpunara

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Maka ugbu a, anyị ga-enwe ike ịmepụta ngwa kachasị mfe, obere oge ka e mesịrị, anyị ga-agbasa ya, Otú ọ dị, ka ị ghara ichere gị, ebe a ntụaka na klaasị App. M na-adụ ọdụ ka ị lelee klas ntọala, ebe ọ bụ na ọ na-ahụ maka ọtụtụ ntọala.

Isi akụkụ

Onye nnọchi anya maka ịnakọta na idobe ndepụta nchekwa

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Yabụ, nke mbụ anyị ga-enweta ngwa ngwa ngwa ngwa - ọ dị mfe. Na-esote, anyị na-ekwupụta n'ụzọ doro anya isiokwu maka onye ọrụ anyị ... N'ebe a, ọ bara uru ịkọwa ihe ọ bụ, ihe n'ime parameter bụ na otu esi edozi nke a n'ụzọ dị iche.

  1. Isiokwu na kafka, ọ bụrụ na anyị chọrọ ịmata kpọmkwem nkọwa, ọ ka mma ịgụ gbanyụọ. akwụkwọ, ma ọ bụ ị nwere ike ịgụ compendium na Habré na Russian, ebe ihe niile na-egosipụtakwa nke ọma :)

  2. Parameter dị n'ime, kọwara nke ọma na faust doc, na-enye anyị ohere ịhazi isiokwu ahụ ozugbo na koodu ahụ, n'ezie, nke a pụtara parampat nke ndị mmepe faust nyere, dịka ọmụmaatụ: njide, amụma njigide (site na ndabara ihichapụ, ma ị nwere ike ịtọ kọmpat), ọnụ ọgụgụ nke nkebi kwa isiokwu (scoresime ihe atụ, ihe na-erughị mkpa zuru ụwa ọnụ ngwa ngwa ngwa ngwa).

  3. N'ozuzu, onye ọrụ ahụ nwere ike ịmepụta isiokwu a na-achịkwa na ụkpụrụ zuru ụwa ọnụ, Otú ọ dị, ọ masịrị m ikwusa ihe niile n'ụzọ doro anya. Na mgbakwunye, enweghị ike ịhazi ụfọdụ parampat (dịka ọmụmaatụ, ọnụọgụ nkebi ma ọ bụ amụma njide) nke isiokwu dị na mgbasa ozi onye nnọchite anya.

    Nke a bụ ihe ọ nwere ike ịdị ka n'ejighị aka kọwaa isiokwu a:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ọ dị mma, ugbu a ka anyị kọwaa ihe onye ọrụ anyị ga-eme :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Yabụ, na mmalite nke onye nnọchi anya, anyị na-emepe nnọkọ aiohttp maka arịrịọ site n'aka onye ahịa anyị. Ya mere, mgbe ịmalite onye ọrụ, mgbe onye ọrụ anyị na-arụ ọrụ, a ga-emepe nnọkọ ozugbo - otu, maka oge niile onye ọrụ na-agba ọsọ (ma ọ bụ ọtụtụ, ma ọ bụrụ na ị gbanwee oke. nkwenye site n'aka onye ọrụ nwere ngalaba ndabara).

Ọzọ, anyị na-eso iyi (anyị na-etinye ozi ahụ _, ebe ọ bụ na anyị, n'ime onye ọrụ a, adịghị eche banyere ọdịnaya) nke ozi sitere na isiokwu anyị, ma ọ bụrụ na ha dị na nkwụsị nke ugbu a, ma ọ bụghị ya, okirikiri anyị ga-echere mbata ha. Ọ dị mma, n'ime akaghị anyị, anyị na-abanye nnata nke ozi ahụ, nweta ndepụta nke nọ n'ọrụ (get_securities na-alaghachi naanị na-arụ ọrụ na ndabara, lee koodu ndị ahịa) ma chekwaa ya na nchekwa data, lelee ma ọ bụrụ na enwere nchekwa nwere otu akara na mgbanwe na nchekwa data , ọ bụrụ na ọ dị, mgbe ahụ, a ga-emelite ya (akwụkwọ) naanị.

Ka anyị malite ihe okike anyị!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Atụmatụ PS akụrụngwa webụ Agaghị m atụle faust na isiokwu, ya mere, anyị na-esetịpụ ọkọlọtọ kwesịrị ekwesị.

N'iwu mmalite anyị, anyị gwara faust ebe a ga-achọ ihe ngwa na ihe a ga-eme ya (ịmalite onye ọrụ) na ọkwa mmepụta log ozi. Anyị na-enweta nsonaazụ a:

nāpunara

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ọ dị ndụ!!!

Ka anyị leba anya na nhazi nkebi. Dị ka anyị nwere ike ịhụ, e mepụtara isiokwu nwere aha anyị họpụtara na koodu ahụ, nọmba ndabara nke nkebi (8, ewepụtara na ya. isiokwu_nkebi - ngwa ihe paramita), ebe anyị akọwapụtaghị uru onye ọ bụla maka isiokwu anyị (site na nkebi). A na-ekenye onye ọrụ ewepụtara n'ime onye ọrụ niile nkebi 8, ebe ọ bụ naanị ya, mana a ga-atụle nke a n'ụzọ zuru ezu na akụkụ gbasara nchịkọta.

Ọ dị mma, ugbu a, anyị nwere ike ịga na windo ọnụ ọzọ wee ziga ozi efu na isiokwu anyị:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS na-eji @ anyị na-egosi na anyị na-ezipụ ozi na isiokwu akpọrọ "collect_securities".

N'okwu a, ozi ahụ gara na nkebi 6 - ị nwere ike ịlele nke a site na ịga na kafdrop localhost:9000

Anyị na onye ọrụ anyị na-aga na windo ọnụ, anyị ga-ahụ ozi obi ụtọ ezitere site na iji loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Anyị nwekwara ike ileba anya na mongo (iji Robo3T ma ọ bụ Studio3T) wee hụ na nchekwa dị na nchekwa data:

Abụghị m billionaire, yabụ anyị nwere afọ ojuju na nhọrọ nlele mbụ.

Ọrụ ndabere na Faust, Nkebi II: Ndị nnọchi anya na otuỌrụ ndabere na Faust, Nkebi II: Ndị nnọchi anya na otu

Obi ụtọ na ọṅụ - onye ọrụ mbụ dị njikere :)

Onye nnọchi anya dị njikere, ogologo ndụ onye nnọchi anya ọhụrụ!

Ee, ụmụ nwoke, anyị kpuchiri naanị 1/3 nke ụzọ nke isiokwu a kwadebere, mana enwela nkụda mmụọ, n'ihi na ugbu a ọ ga-adị mfe.

Yabụ ugbu a, anyị chọrọ onye nnọchi anya na-anakọta ozi meta wee tinye ya na akwụkwọ nnakọta:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ebe ọ bụ na onye ọrụ a ga-ahazi ozi gbasara otu nchekwa, anyị kwesịrị igosi akara (akara ngosi) nke nchekwa a na ozi ahụ. Maka ebumnuche a na faust enwere Records - klaasị na-ekwupụta atụmatụ ozi na isiokwu onye nnọchite anya.

N'okwu a, ka anyị gaa ndekọ.py ma kọwaa ihe ozi maka isiokwu a kwesịrị ịdị ka:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Dịka ị siri chepụta, faust na-eji ụdị nkọwapụta nke python kọwaa atụmatụ ozi, nke mere na ọbịbịa kacha nta nke ụlọ akwụkwọ ahụ kwadoro. 3.6.

Ka anyị laghachi na onye nnọchi anya, tọọ ụdị ma tinye ya:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Dị ka ị na-ahụ, anyị na-agafe paramita ọhụrụ nwere atụmatụ gaa na usoro mmalite isiokwu - value_type. Ọzọkwa, ihe niile na-agbaso otu atụmatụ ahụ, n'ihi ya, anaghị m ahụ ihe ọ bụla na-ebi na ihe ọ bụla ọzọ.

Ọ dị mma, mmetụ ikpeazụ bụ ịgbakwunye oku na ndị na-anakọta ozi meta ka ha nweta_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Anyị na-eji atụmatụ ekwuputala na mbụ maka ozi ahụ. N'okwu a, ejiri m usoro nkedo .nkedo ebe ọ bụ na ọ dịghị mkpa ka anyị chere nsonaazụ sitere n'aka onye ọrụ ahụ, mana ọ bara uru ịkọ nke ahụ. ụzọ zipu ozi na isiokwu a:

  1. nkedo - anaghị egbochi n'ihi na ọ naghị atụ anya nsonaazụ. Ị nweghị ike izipu nsonaazụ ya na isiokwu ọzọ dị ka ozi.

  2. zipu - anaghị egbochi n'ihi na ọ naghị atụ anya nsonaazụ. Ị nwere ike ịkọwapụta onye nnọchi anya na isiokwu nke nsonaazụ ga-aga.

  3. jụọ - na-echere nsonaazụ. Ị nwere ike ịkọwapụta onye nnọchi anya na isiokwu nke nsonaazụ ga-aga.

Yabụ, nke ahụ bụ ndị nnọchi anya maka taa!

Ndị otu nrọ

Ihe ikpeazụ m kwere nkwa ide na akụkụ a bụ iwu. Dị ka e kwuru na mbụ, iwu na faust bụ ihe mkpuchi gburugburu pịa. N'ezie, faust na-ejikọta iwu omenala anyị na interface ya mgbe ọ na-akọwa igodo -A

Mgbe ndị ọrụ mara ọkwa na ndị ọrụ.py tinye ọrụ na ihe ndozi ngwa.iwuna-akpọ usoro nkedo у mkpokọta_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ya mere, ọ bụrụ na anyị kpọọ ndepụta nke iwu, iwu ọhụrụ anyị ga-adị na ya:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Anyị nwere ike iji ya dị ka onye ọ bụla ọzọ, yabụ ka anyị malitegharịa onye ọrụ na-adịghị mma wee malite nchịkọta nchekwa zuru oke:

> faust -A horton.agents start-collect-securities

Gịnị ga-eme ọzọ?

N'akụkụ nke ọzọ, na-eji ndị ọrụ fọdụrụnụ dị ka ihe atụ, anyị ga-atụle usoro sink maka ịchọ oke na ọnụ ahịa mmechi nke ịzụ ahịa maka afọ na mmalite cron nke ndị ọrụ.

Nke ahụ bụ naanị maka taa! Daalụ maka ịgụ akwụkwọ :)

Koodu maka akụkụ a

Ọrụ ndabere na Faust, Nkebi II: Ndị nnọchi anya na otu

PS N'okpuru akụkụ ikpeazụ a jụrụ m gbasara faust na confluent kafka (kedu njirimara confluent nwere?). Ọ dị ka confluent na-arụ ọrụ n'ọtụtụ ụzọ, mana nke bụ eziokwu bụ na faust enweghị nkwado ndị ahịa zuru oke maka confluent - nke a na-esote site na. nkọwa nke mgbochi ndị ahịa na doc.

isi: www.habr.com

Tinye a comment