Ntchito zakumbuyo pa Faust, Gawo II: Othandizira ndi Magulu

Ntchito zakumbuyo pa Faust, Gawo II: Othandizira ndi Magulu

Zamkatimu

  1. Gawo I: Mawu Oyamba

  2. Gawo II: Othandizira ndi Magulu

Titani pano?

Kotero, kotero, gawo lachiwiri. Monga tanenera kale, mmenemo tidzachita zotsatirazi:

  1. Tiyeni tilembe makasitomala ang'onoang'ono a alphavantage pa aiohttp ndi zopempha zomaliza zomwe tikufuna.

  2. Tiyeni tipange wothandizira yemwe angasonkhanitse zidziwitso zachitetezo ndi chidziwitso cha meta pa iwo.

Koma, izi ndi zomwe tidzachitire pulojekitiyo yokha, ndipo ponena za kafukufuku wofulumira, tiphunzira momwe tingalembere antchito omwe amayendetsa zochitika kuchokera ku kafka, komanso momwe angalembere malamulo (dinani wrapper), kwa ife - kwa mauthenga okankhira pamanja pamutu womwe wothandizira akuyang'anira.

Kukonzekera

AlphaVantage Client

Choyamba, tiyeni tilembe kakasitomala kakang'ono ka aiohttp pazofunsira ku alphavantage.

alphavantage.py

zoyambilira

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Kwenikweni, zonse zimamveka bwino:

  1. AlphaVantage API ndi yosavuta komanso yokongola, kotero ndidaganiza zopempha zonse kudzera munjirayi construct_query pomwe palinso foni ya http.

  2. Ndimabweretsa minda yonse snake_case kuti zitheke.

  3. Chabwino, chokongoletsera cha logger.catch cha kutulutsa kokongola komanso kodziwitsa za traceback.

PS Musaiwale kuwonjezera chizindikiro cha alphavantage kwanuko ku config.yml, kapena kutumiza zosintha zachilengedwe. HORTON_SERVICE_APIKEY. Ife timalandira chizindikiro apa.

Gawo la CRUD

Tidzakhala ndi zosunga zotetezedwa kuti tisunge zambiri zachitetezo cha meta.

database/security.py

Malingaliro anga, palibe chifukwa chofotokozera chilichonse apa, ndipo kalasi yoyambira yokha ndiyosavuta.

get_app()

Tiyeni tiwonjezere ntchito yopangira chinthu chogwiritsira ntchito pulogalamu.py

zoyambilira

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Pakadali pano tikhala ndi zopanga zosavuta kwambiri, pakapita nthawi tidzakulitsa, komabe, kuti tisakudikireni, apa maumboni ku App-class. Ndikukulangizaninso kuti muyang'ane kalasi ya zoikamo, chifukwa ili ndi udindo pazokonda zambiri.

Thupi lalikulu

Wothandizira kusonkhanitsa ndi kusunga mndandanda wa zotetezedwa

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Chifukwa chake, choyamba timapeza chinthu chosavuta kugwiritsa ntchito - ndichosavuta. Kenaka, timalengeza momveka bwino mutu wa wothandizira wathu ... Apa ndikuyenera kutchula zomwe ziri, zomwe zili mkati mwake ndi momwe izi zingakonzedwere mosiyana.

  1. Mitu mu kafka, ngati tikufuna kudziwa tanthauzo lenileni, ndi bwino kuwerenga kuzimitsa. chikalata, kapena mukhoza kuwerenga phatikiza pa Habré mu Chirasha, pomwe zonse zimawonekeranso molondola :)

  2. Parameter mkati, yofotokozedwa bwino mu faust doc, imatilola kuti tikonze mutuwo mwachindunji mu code, ndithudi, izi zikutanthauza magawo omwe amaperekedwa ndi omanga amphamvu, mwachitsanzo: kusunga, kusunga ndondomeko (mwachisawawa kuchotsa, koma mukhoza kukhazikitsa yaying'ono), chiwerengero cha magawo pa mutu uliwonse (zambirikuchita, mwachitsanzo, zochepa kuposa kufunikira kwapadziko lonse ntchito mwachangu).

  3. Kawirikawiri, wothandizira akhoza kupanga mutu woyendetsedwa ndi makhalidwe apadziko lonse, komabe, ndimakonda kulengeza zonse momveka bwino. Kuphatikiza apo, magawo ena (mwachitsanzo, kuchuluka kwa magawo kapena mfundo zosungira) za mutu womwe uli muzotsatsa za wothandizira sungathe kukhazikitsidwa.

    Izi ndi momwe zingawonekere popanda kufotokozera mutuwo pamanja:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Chabwino, tsopano tiyeni tifotokoze zomwe wothandizira wathu adzachita :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Chifukwa chake, kumayambiriro kwa wothandizira, timatsegula gawo la aiohttp pazopempha kudzera mwa kasitomala wathu. Chifukwa chake, poyambitsa wogwira ntchito, wothandizila akayamba, gawo lidzatsegulidwa nthawi yomweyo - imodzi, nthawi yonse yomwe wogwira ntchitoyo akugwira ntchito (kapena angapo, ngati musintha magawo. ndalama kuchokera kwa wothandizira wokhala ndi gawo losasintha).

Kenako, timatsatira mtsinje (timayika uthengawo _, popeza ife, mwa wothandizira uyu, sitisamala za zomwe zili) za mauthenga ochokera pamutu wathu, ngati zilipo pakalipano, mwinamwake kuzungulira kwathu kudzadikirira kufika kwawo. Chabwino, mkati mwa kuzungulira kwathu, timalemba chiphaso cha uthengawo, pezani mndandanda wazomwe zimagwira (get_securities returns only active by default, onani kasitomala code) zotetezedwa ndikuzisunga ku database, kuyang'ana ngati pali chitetezo chokhala ndi ticker yomweyo ndi kusinthana mu database , ngati ilipo, ndiye (pepala) lidzasinthidwa.

Tiyeni tiyambitse chilengedwe chathu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features tsamba lawebusayiti Sindingaganizire mozama m'nkhanizo, kotero timayika mbendera yoyenera.

Mu lamulo lathu loyambitsa, tidauza faust komwe angayang'ane chinthucho ndi choti tichite nacho (kuyambitsa wogwira ntchito) ndi mulingo wotulutsa chidziwitso. Timapeza zotsatira zotsatirazi:

zoyambilira

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ndi moyo!!!

Tiyeni tiwone gawo la magawo. Monga tikuonera, mutu unapangidwa ndi dzina lomwe tidasankha mu code, chiwerengero chosasinthika cha magawo (8, otengedwa kuchokera mutu_gawo - chinthu chogwiritsa ntchito parameter), popeza sitinatchule mtengo wamutu wathu (kudzera magawo). Wothandizira wokhazikitsidwa mwa wogwira ntchitoyo amapatsidwa magawo onse a 8, chifukwa ndi okhawo, koma izi zidzakambidwa mwatsatanetsatane mu gawo la kusonkhanitsa.

Chabwino, tsopano titha kupita pawindo lina lakutsogolo ndikutumiza uthenga wopanda kanthu pamutu wathu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS kugwiritsa ntchito @ tikuwonetsa kuti tikutumiza uthenga kumutu wotchedwa "collect_securities".

Pankhaniyi, uthenga udapita ku gawo 6 - mutha kuyang'ana izi popita ku kafdrop on localhost:9000

Kupita pawindo la terminal ndi wogwira ntchito, tiwona uthenga wosangalatsa womwe watumizidwa pogwiritsa ntchito loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Titha kuyang'ananso mongo (pogwiritsa ntchito Robo3T kapena Studio3T) ndikuwona kuti zotetezedwa zili m'nkhokwe:

Sindine bilionea, chifukwa chake ndife okhutira ndi njira yoyamba yowonera.

Ntchito zakumbuyo pa Faust, Gawo II: Othandizira ndi MaguluNtchito zakumbuyo pa Faust, Gawo II: Othandizira ndi Magulu

Chimwemwe ndi chisangalalo - wothandizira woyamba ali wokonzeka :)

Wokonzeka, khalani ndi moyo wautali wothandizira watsopano!

Inde, abambo, tangophimba 1/3 ya njira yomwe yakonzedwa ndi nkhaniyi, koma musataye mtima, chifukwa tsopano zidzakhala zosavuta.

Chifukwa chake tsopano tikufuna wothandizira yemwe amasonkhanitsa zambiri za meta ndikuziyika muzolemba zosonkhanitsira:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Popeza wothandizirayo akonza zambiri zokhudza chitetezo china, tiyenera kusonyeza chizindikiro (chizindikiro) cha chitetezo ichi mu uthengawo. Kwa cholinga ichi mu faust alipo Records - makalasi omwe amalengeza dongosolo la uthenga pamutu wa wothandizira.

Pankhaniyi, tiyeni tipite records.py ndikufotokozerani momwe uthenga wamutuwu uyenera kuwoneka:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Monga momwe mungaganizire, faust amagwiritsa ntchito mawu amtundu wa python pofotokoza schema ya uthenga, ndichifukwa chake mtundu wochepera wothandizidwa ndi laibulale ndi. 3.6.

Tiyeni tibwerere kwa wothandizira, ikani mitundu ndikuwonjezera:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Monga mukuwonera, timadutsa gawo latsopano ndi chiwembu kunjira yoyambira mutu - value_type. Kuphatikiza apo, zonse zimatsata dongosolo lomwelo, kotero sindikuwona kufunikira kokhala pa china chilichonse.

Chabwino, kukhudza komaliza ndikuwonjezera kuyimba kwa meta wosonkhanitsa zidziwitso kuti atole_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Timagwiritsa ntchito chiwembu chomwe chidalengezedwa kale ku uthengawo. Pachifukwa ichi, ndinagwiritsa ntchito njira ya .cast popeza sitiyenera kuyembekezera zotsatira kuchokera kwa wothandizira, koma ndi bwino kutchula izi. njira tumizani uthenga kumutuwu:

  1. kuponya - sikuletsa chifukwa sikuyembekezera zotsatira. Simungathe kutumiza zotsatira ku mutu wina ngati uthenga.

  2. kutumiza - sikuletsa chifukwa sikuyembekezera zotsatira. Mutha kufotokozera wothandizira pamutu womwe zotsatira zake zidzapita.

  3. funsani - akuyembekezera zotsatira. Mutha kufotokozera wothandizira pamutu womwe zotsatira zake zidzapita.

Kotero, ndizo zonse ndi othandizira lero!

The Dream Team

Chinthu chomaliza chimene ndinalonjeza kuti ndidzalemba m’gawoli ndi malamulo. Monga tanenera kale, malamulo mu faust ndi wrapper kuzungulira. M'malo mwake, faust amangophatikizira lamulo lathu pamawonekedwe ake pofotokoza -A kiyi

Pambuyo polengeza ma agents mu othandizira.py onjezani ntchito ndi wokongoletsa app.commandkuyitana njira kuponyedwa у sonkhanitsani_zotetezedwa:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Chifukwa chake, ngati tiitana mndandanda wamalamulo, lamulo lathu latsopano likhala momwemo:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Titha kuzigwiritsa ntchito ngati wina aliyense, ndiye tiyeni tiyambitsenso wogwira ntchitoyo ndikuyamba kusonkhanitsa zotetezedwa:

> faust -A horton.agents start-collect-securities

Nanga n’ciani cidzacitika pambuyo pake?

Mu gawo lotsatira, pogwiritsa ntchito othandizira otsala monga mwachitsanzo, tiwona njira yozama yofufuzira monyanyira pamitengo yotseka yamalonda pachaka komanso kukhazikitsidwa kwa othandizira.

Ndizo zonse lero! Zikomo powerenga :)

Khodi ya gawoli

Ntchito zakumbuyo pa Faust, Gawo II: Othandizira ndi Magulu

PS Pansi pa gawo lomaliza ndidafunsidwa za faust and confluent kafka (Kodi confluent ili ndi zinthu ziti?). Zikuwoneka kuti confluent imagwira ntchito kwambiri m'njira zambiri, koma chowonadi ndichakuti faust ilibe chithandizo chokwanira chamakasitomala pazolumikizana - izi zimachokera kufotokoza za zoletsa kasitomala mu doc.

Source: www.habr.com

Kuwonjezera ndemanga