Mesebetsi ea morao-rao ho Faust, Karolo ea II: Baemeli le Lihlopha

Mesebetsi ea morao-rao ho Faust, Karolo ea II: Baemeli le Lihlopha

Lethathamo la tse kahare

  1. Karolo ea I: Selelekela

  2. Karolo ea II: Baemeli le Lihlopha

Re etsang moo?

Kahoo, ho joalo, karolo ea bobeli. Joalokaha ho ngotsoe pejana, ho eona re tla etsa tse latelang:

  1. Ha re ngoleng moreki e monyane bakeng sa alphavantage ho aiohttp ka likopo tsa liphetho tseo re li hlokang.

  2. Ha re theheng moemeli ea tla bokella data mabapi le matšeliso le lintlha tsa meta ho tsona.

Empa, sena ke seo re tla se etsetsa morero ka boeona, 'me mabapi le lipatlisiso tse fokolang, re tla ithuta ho ngola baemeli ba sebetsanang le liketsahalo tsa molapo ho tloha kafka, hammoho le mokhoa oa ho ngola litaelo (tobetsa wrapper), molemong oa rona - bakeng sa melaetsa ea push push to the topic that agent is monitoring.

Ho lokisetsa

AlphaVantage Client

Taba ea pele, ha re ngoleng moreki e nyane oa aiohttp bakeng sa likopo tsa alphavantage.

alphavantage.py

mosenyi

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Ha e le hantle, ntho e 'ngoe le e' ngoe e hlakile ho eona:

  1. AlphaVantage API e bonolo ebile e entsoe ka bokhabane, kahoo ke nkile qeto ea ho etsa likopo tsohle ka mokhoa ona construct_query moo ho nang le pitso ea http.

  2. Ke tlisa masimo 'ohle ho snake_case molemong.

  3. Hantle, mokhabiso oa logger.catch bakeng sa tlhahiso e ntle le e rutang ea traceback.

PS Se ke oa lebala ho kenyelletsa lets'oao la alphavantage sebakeng sa heno ho config.yml, kapa ho romella kantle ho naha phapang ea tikoloho. HORTON_SERVICE_APIKEY. Re amohela letshwao mona.

Sehlopha sa CRUD

Re tla ba le pokello ea li-securities ho boloka lintlha tsa meta mabapi le matšeliso.

database/security.py

Ka maikutlo a ka, ha ho hlokahale ho hlalosa letho mona, 'me sehlopha sa motheo ka boeona se bonolo haholo.

fumana_app()

Ha re kenye tšebetsong bakeng sa ho theha ntho ea ts'ebeliso ho app.py

mosenyi

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Hajoale re tla ba le ts'ebetso e bonolo ka ho fetisisa ea ts'ebeliso, hamorao re tla e atolosa, leha ho le joalo, e le hore re se ke ra lula re emetse, mona. litšupiso ho sehlopha sa App. Ke boetse ke u eletsa hore u shebe sehlopha sa litlhophiso, kaha ke sona se ikarabellang bakeng sa litlhophiso tse ngata.

Karolo e kholo

Moemeli oa ho bokella le ho boloka lethathamo la matšeliso

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Kahoo, pele re fumana ntho e sebetsang ka ho fetesisa - e bonolo haholo. Ka mor'a moo, re phatlalatsa ka ho hlaka sehlooho bakeng sa moemeli oa rona ... Mona ke habohlokoa ho bolela hore na ke eng, hore na parameter ea ka hare ke eng le hore na sena se ka hlophisoa joang ka tsela e fapaneng.

  1. Lihlooho tsa kafka, haeba re batla ho tseba tlhaloso e nepahetseng, ho molemo ho bala tima. tokomane, kapa u ka bala kakaretso ho Habré ka Serussia, moo ntho e 'ngoe le e' ngoe e bonts'ang ka nepo :)

  2. Paramethara ka hare, e hlalositsoeng hantle ho faust doc, e re lumella hore re lokise sehlooho ka ho toba ka khoutu, ehlile, sena se bolela litekanyo tse fanoeng ke baetsi ba faust, mohlala: leano la ho boloka, ho boloka (ka ho hlakola kamehla, empa u ka beha diseke), palo ea likarolo ka sehlooho (likaroloanaho etsa, mohlala, ka tlase ho bohlokoa ba lefats'e lits'ebetso tse potlakileng).

  3. Ka kakaretso, moemeli a ka etsa sehlooho se laoloang ka litekanyetso tsa lefats'e, leha ho le joalo, ke rata ho phatlalatsa ntho e 'ngoe le e' ngoe ka ho hlaka. Ho phaella moo, litekanyo tse ling (mohlala, palo ea likarolo kapa pholisi ea ho boloka) ea sehlooho papatsong ea moemeli e ke ke ea hlophisoa.

    Mona ke hore na e ka shebahala joang ntle le ho hlalosa sehlooho ka letsoho:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Joale, ha re hlalose hore na moemeli oa rona o tla etsa eng :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Kahoo, qalong ea moemeli, re bula seboka sa aiohttp bakeng sa likopo ka moreki oa rona. Kahoo, ha ho qala mosebeletsi, ha moemeli oa rona a qalisoa, lenaneo le tla buloa hang-hang - e le 'ngoe, bakeng sa nako eohle eo mosebeletsi a ntseng a sebetsa (kapa tse' maloa, haeba u fetola parameter. concurrency ho tsoa ho moemeli ea nang le yuniti ea kamehla).

Ka mor'a moo, re latela molapo (re beha molaetsa ho _, kaha rona, moemeli enoa, ha re tsotelle litaba) tsa melaetsa e tsoang sehloohong sa rona, haeba e le teng ka nako ea hona joale, ho seng joalo potoloho ea rona e tla emela ho fihla ha bona. Hantle, ka har'a loop ea rona, re kenya receipt ea molaetsa, re fumana lethathamo la ts'ebetso (get_securities returns only active by default, bona client code) le ho e boloka ho database, ho hlahloba hore na ho na le ts'ireletso e nang le ticker e tšoanang le. phapanyetsano ho database , haeba ho na le, joale (pampiri) e tla nchafatsoa feela.

Ha re thakhoseng popo ea rona!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Likarolo tsa PS karolo ea tepo Ha ke na ho nahana ka maikutlo a fosahetseng lihloohong, kahoo re beha folakha e loketseng.

Taelong ea rona ea ho qala, re ile ra bolella faust hore na u ka batla ntho ea kopo hokae le hore na u lokela ho etsa eng ka eona (hlakola mosebeletsi) ka boemo ba tlhahiso ea lintlha. Re fumana tlhahiso e latelang:

mosenyi

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

E a phela!!!

Ha re shebeng sete ea karohano. Joalokaha re bona, sehlooho se entsoe ka lebitso leo re le khethileng ka khoutu, palo e sa feleng ea likarolo (8, e nkiloeng ho lihlooho_likarolo - paramethara ea ntho ea ts'ebeliso), kaha ha rea ​​​​ka ra bolela boleng ba motho ka mong bakeng sa sehlooho sa rona (ka li-partitions). Moemeli ea qalileng ho mosebeletsi o abetsoe likarolo tsohle tse 8, kaha ke eona feela, empa sena se tla tšohloa ka ho qaqileng haholoanyane karolong e mabapi le ho kopanya.

Joale, re ka ea fensetereng e 'ngoe ea terminal ebe re romella molaetsa o se nang letho sehloohong sa rona:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS sebelisa @ re bontša hore re romella molaetsa sehloohong se bitsoang "collect_securities".

Tabeng ena, molaetsa o ile oa ea karolong ea 6 - o ka hlahloba sena ka ho ea kafdrop on localhost:9000

Ha re ea fensetereng ea terminal le mosebeletsi oa rona, re tla bona molaetsa o monate o rometsoeng ho sebelisoa loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Re ka boela ra sheba mongo (re sebelisa Robo3T kapa Studio3T) 'me ra bona hore matšeliso a teng ka har'a database:

Ha ke bilione, ka hona re khotsofetse ka khetho ea pele ea ho shebella.

Mesebetsi ea morao-rao ho Faust, Karolo ea II: Baemeli le LihlophaMesebetsi ea morao-rao ho Faust, Karolo ea II: Baemeli le Lihlopha

Thabo le thabo - moemeli oa pele o se a loketse :)

Moemeli o itokisitse, phela nako e telele moemeli e mocha!

E, banna ba ka, re koahetse 1/3 feela ea tsela e lokiselitsoeng ke sehlooho sena, empa le se ke la nyahama, hobane joale ho tla ba bonolo.

Kahoo joale re hloka moemeli ea bokellang lintlha tsa meta ebe o li kenya tokomaneng ea pokello:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Kaha moemeli enoa o tla sebetsana le tlhahisoleseling mabapi le ts'ireletso e itseng, re hloka ho bonts'a ticker (letšoao) la ts'ireletso ena molaetseng. Bakeng sa morero ona ka faust ho na le Records - litlelase tse phatlalatsang morero oa molaetsa sehloohong sa moemeli.

Tabeng ena, ha re eeng ho litlaleho.py 'me u hlalose hore na molaetsa oa sehlooho sena o lokela ho shebahala joang:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Joalo ka ha u ka be u nahanne, faust o sebelisa tlhaloso ea mofuta oa python ho hlalosa schema ea molaetsa, ke ka lebaka leo mofuta o fokolang o tšehetsoeng ke laeborari e leng. 3.6.

Ha re khutlele ho moemeli, re behe mefuta ebe re e eketsa:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Joalokaha u bona, re fetisa parameter e ncha e nang le moralo ho mokhoa oa ho qala sehlooho - value_type. Ho feta moo, ntho e 'ngoe le e' ngoe e latela morero o tšoanang, kahoo ha ke bone ntlha ea ho lula holim'a ntho leha e le efe e 'ngoe.

Taba ea ho qetela ke ho kenya mohala ho moemeli oa pokello ea lintlha tsa meta ho bokella_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Re sebelisa leano le phatlalalitsoeng pele bakeng sa molaetsa. Tabeng ena, ke sebelisitse mokhoa oa .cast kaha ha ho hlokahale hore re emele sephetho ho tsoa ho moemeli, empa ho bohlokoa ho bolela seo. mekhoa romela molaetsa sehloohong:

  1. cast - ha e thibele hobane e sa lebella sephetho. O ke ke wa romela sephetho sehloohong se seng e le molaetsa.

  2. romela - ha e thibele hobane e sa lebella sephetho. U ka hlakisa moemeli sehloohong seo sephetho se tla ea ho sona.

  3. botsa - o emetse sephetho. U ka hlakisa moemeli sehloohong seo sephetho se tla ea ho sona.

Kahoo, ke tsohle tse nang le baemeli ba kajeno!

Sehlopha sa litoro

Ntho ea ho qetela eo ke tšepisitseng ho e ngola karolong ena ke litaelo. Joalokaha ho boletsoe pejana, litaelo tsa faust ke ho pota-pota ho pota-pota. Ha e le hantle, faust e kopanya taelo ea rona ea tloaelo ho sebopeho sa eona ha e totobatsa -A senotlolo

Ka mor'a hore baemeli ba phatlalalitsoeng ba kene baemeli.py eketsa tšebetso ka mokhabiso app.commandho bitsa mokhoa lahle у bokella_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Kahoo, haeba re bitsa lethathamo la litaelo, taelo ea rona e ncha e tla ba ho eona:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Re ka e sebelisa joalo ka motho e mong le e mong, ka hona, ha re qaleng hape mosebeletsi ea sa sebetseng hantle 'me re qale pokello e felletseng ea litšireletso:

> faust -A horton.agents start-collect-securities

Ho tla latela eng?

Karolong e latelang, re sebelisa mahlahana a setseng e le mohlala, re tla nahana ka mochine oa sink bakeng sa ho batla ho feteletseng ka theko ea ho koala ea khoebo bakeng sa selemo le ho qalisoa ha li-agent.

Ke phetho bakeng sa kajeno! Ke leboha ho bala :)

Khoutu bakeng sa karolo ena

Mesebetsi ea morao-rao ho Faust, Karolo ea II: Baemeli le Lihlopha

PS Tlas'a karolo ea ho qetela ke ile ka botsoa ka faust le confluent kafka (confluent e na le likarolo life?). Ho bonahala eka confluent e sebetsa haholoanyane ka mekhoa e mengata, empa 'nete ke hore faust ha e na tšehetso e felletseng ea bareki bakeng sa confluent - sena se latela litlhaloso tsa lithibelo tsa bareki ho doc.

Source: www.habr.com

Eketsa ka tlhaloso