Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Tusmada

  1. Qaybta I: Hordhac

  2. Qaybta II: Wakiilada iyo Kooxaha

Maxaan ka sameyneynaa halkan?

Haddaba, qaybta labaad. Sidii hore loogu qoray, waxaan ku samayn doonaa kuwan soo socda:

  1. Aynu ku qorno macmiil yar oo alfavantage ah aiohttp oo ay ku jiraan codsiyada dhamaadka-dhamaadka ee aan u baahanahay.

  2. Aan abuurno wakiil ururin doona xogta securities iyo meta macluumaadka iyaga ku saabsan.

Laakiin, tani waa waxa aan u samayn doono mashruuca laftiisa, iyo marka la eego cilmi-baarista faust, waxaan baran doonaa sida loo qoro wakiilada kuwaas oo ka shaqeeya dhacdooyinka socodka kafka, iyo sidoo kale sida loo qoro amarrada (riix duub), kiiskeena - farriimaha riixitaanka gacanta ee mawduuca uu wakiilku kormeerayo.

Tababarka

Macmiilka AlphaVantage

Marka hore, aynu u qorno macmiil yar oo aiohttp ah codsiyada alfavantage.

alfavantage.py

Baabbi'iyihii

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Dhab ahaantii, wax walba way iska cad yihiin:

  1. AlphaVantage API si fudud oo qurux badan ayaa loo qaabeeyey, marka waxaan go'aansaday inaan dhammaan codsiyada ku sameeyo habka construct_query halkaas oo dhanka kale uu jiro wicitaan http ah.

  2. Waxaan keenayaa beeraha oo dhan snake_case raaxada.

  3. Hagaag, qurxinta logger.catch ee soo saarida raadraaca quruxda badan iyo macluumaadka.

PS Ha iloobin inaad ku darto calaamada alfavantage gudaha config.yml, ama dhoofinta doorsoomaha deegaanka HORTON_SERVICE_APIKEY. Waxaan helnaa calaamad halkan.

fasalka CRUD

Waxaan yeelan doonaa ururin dammaanad ah si aan u kaydino macluumaadka meta ee ku saabsan securities.

database/security.py

Fikradayda, looma baahna in wax lagu sharaxo halkan, iyo fasalka aasaasiga ah laftiisa waa mid fudud.

heli_app()

Aynu ku darno shaqo abuurista shay codsi barnaamijka.py

Baabbi'iyihii

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Hadda waxaan heli doonaa abuurista codsiga ugu fudud, wax yar ka dib waan ballaarin doonaa, si kastaba ha ahaatee, si aan kuu sugin, halkan tixraacyo App-class. Waxaan sidoo kale kugula talinayaa inaad eegto fasalka dejinta, maadaama ay mas'uul ka tahay inta badan goobaha.

Qaybta ugu muhiimsan

Wakiilka ururinta iyo ilaalinta liiska dammaanadda

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Marka, marka hore waxaan helnaa shayga codsiga degdega ah - waa wax fudud. Marka xigta, waxaan si cad ugu dhawaaqeynaa mawduuc loogu talagalay wakiilkayaga ... Halkan waxaa habboon in la sheego waxa ay tahay, waa maxay qiyaasta gudaha iyo sida tan loo habeyn karo si ka duwan.

  1. Mawduucyada kafka, haddii aan rabno inaan ogaano qeexida saxda ah, way fiicantahay in la akhriyo off dukumeenti, ama waad akhrin kartaa compendium Habré oo Ruush ah, halkaas oo wax waliba si sax ah uga muuqdaan :)

  2. Parameter gudaha, oo si fiican loogu sharraxay dokumentiga faust, wuxuu noo ogolaanayaa inaan mawduuca si toos ah u habeyno koodhka, dabcan, tani waxay ka dhigan tahay cabbirrada ay bixiyaan soo-saareyaasha xun, tusaale ahaan: sii-haynta, siyaasadda sii-haynta (sida caadiga ah tirtir, laakiin waad dejin kartaa isafgaradka), tirada qaybaha mowduuc kasta (qayboodin la sameeyo, tusaale ahaan, wax ka yar muhiimadda caalamiga ah codsiyada degdega ah).

  3. Guud ahaan, wakiilku wuxuu abuuri karaa mawduuc la maareeyay oo leh qiyam caalami ah, si kastaba ha ahaatee, waxaan jeclahay inaan wax walba si cad u sheego. Intaa waxaa dheer, qaar ka mid ah xuduudaha (tusaale, tirada qaybaha ama siyaasadda sii haynta) ee mawduuca ku jira xayeysiiska wakiilka lama habeyn karo.

    Waa kuwan sida ay u ekaan karto iyada oo aan gacanta lagu qeexin mawduuca:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Hagaag, hadda aan sharaxno waxa wakiilkeenu samayn doono :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Markaa, bilawga wakiilka, waxaanu furnaa kalfadhi aiohttp codsiyada macmiilkayaga. Markaa, marka la bilaabayo shaqaale, marka wakiilkeena la bilaabo, fadhi isla markiiba waa la furi doonaa - mid, inta lagu jiro wakhtiga shaqaaluhu socdo (ama dhowr, haddii aad bedesho qiyaasta isku-duwidda oo ka yimid wakiil leh unug asal ah).

Marka xigta, waxaan raacnaa qulqulka (fariinta waxaan gelineynaa _, maadaama anaga, wakiilkan, aan dan ka lahayn nuxurka) ee fariimaha mawduuceena, haddii ay ku jiraan xajinta hadda, haddii kale wareeggayagu wuxuu sugi doonaa imaatinkooda. Hagaag, gudaha loop-keena, waxaanu galnaa risiidhka fariinta, waxaanu helnaa liis firfircoon (get_securities soo celinta oo kaliya firfircoonida caadiga ah, eeg code-ka macmiilka) oo kaydiya kaydinta xogta, anagoo hubinayna haddii uu jiro ammaan leh calaamad isku mid ah iyo ku beddelashada kaydka xogta , haddii ay jirto, markaa (warqad) si fudud ayaa loo cusboonaysiin doonaa.

Aan bilowno abuurkeena!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Tilmaamaha PS qaybta webka Ma tixgelin doono khaladka maqaallada, markaa waxaan dejineynaa calanka ku habboon.

Amarka furitaankayada, waxaanu u sheegnay faust halka laga raadiyo shayga arjiga iyo waxa lagu samaynayo (bilaw shaqaale) oo wata heerka soo saarida xogta log. Waxaan helnaa wax soo saarka soo socda:

Baabbi'iyihii

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Waa noole!!!

Aynu eegno qaybta qaybinta. Sida aan arki karno, mowduuc ayaa la sameeyay magaca aan ku magacownay koodhka, tirada qaybta caadiga ah (8, laga soo qaatay mawduuc_qodob - halbeegga shayga codsiga), maadaama aynaan u cayimin qiimaha shakhsi ahaaneed mawduuceena (iyada oo loo marayo qaybo). Wakiilka la bilaabay ee shaqaalaha waxaa loo qoondeeyay dhammaan 8 qaybood, maadaama ay tahay ka kaliya, laakiin tan ayaa si faahfaahsan looga hadli doonaa qaybta ku saabsan kooxaynta.

Hagaag, hadda waxaan aadi karnaa daaqad kale oo terminal ah oo aan fariin maran u dirno mawduuceena:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS isticmaalaya @ waxaan tuseynaa inaan fariin u direyno mowduuc la yiraahdo "collect_securities".

Xaaladdan oo kale, fariintu waxay tagtay qaybta 6 - waxaad ku hubin kartaa tan adoo aadaya kafdrop on localhost:9000

Markaan shaqaalahayada la aadno daaqadda terminalka, waxaan arki doonaa fariin farxad leh oo loo diray loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Waxaan sidoo kale eegi karnaa mongo (adigoo isticmaalaya Robo3T ama Studio3T) oo aan aragno in ammaanadu ay ku jiraan kaydka:

Anigu ma ihi bilyaneer, sidaas darteed waxaan ku qanacsanahay ikhtiyaarka daawashada koowaad.

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo KooxahaHawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

Farxad iyo farxad - wakiilka ugu horreeya waa diyaar :)

Wakiilka diyaar, ha noolaado wakiilka cusub!

Haa, nimanyahow, 1/3 dariiqa uu maqaalkani diyaariyey ayaanu ku soo koobnay, laakiin ha niyad jabina, waayo hadda way fududaanaysaa.

Markaa hadda waxaan u baahanahay wakiil ururiya macluumaadka meta oo geliya dukumeenti ururin:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Mar haddii wakiilkani uu ka baaraandegi doono macluumaadka ku saabsan ammaan gaar ah, waxaan u baahannahay inaan calaamadda (calaamadaha) ku muujinno farriinta amnigan. Ujeedada this in faust waxaa jira Records - fasallada ku dhawaaqaya nidaamka fariinta mawduuca wakiilka.

Xaaladdan oo kale, aan aado diiwaanada.py oo sharax sida fariinta mawduucan ay u ekaan doonto:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Sida aad qiyaasi lahayd, faust waxay isticmaashaa sharraxaadda nooca python si ay u qeexdo qorshaha farriinta, waana sababta nooca ugu yar ee ay taageerto maktabaddu 3.6.

Aan ku soo laabano wakiilka, dejino noocyada oo aan ku darno:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Sida aad arki karto, waxaanu u gudbinaa halbeeg cusub oo leh nidaam habka bilowga mawduuca - value_type. Intaa waxaa dheer, wax kastaa waxay raacaan nidaam isku mid ah, markaa ma arko wax dhibic ah oo ku noolaanshaha wax kale.

Hagaag, taabashada kama dambaysta ahi waa in lagu daro wicitaan wakiilka xog ururinta meta si ay u ururiyaan_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Waxaan fariinta u isticmaalnaa nidaamka hore loo sheegay. Xaaladdan oo kale, waxaan isticmaalay habka .cast tan iyo markii aan u baahnayn inaan sugno natiijada wakiilka, laakiin waxaa habboon in la sheego taas siyaabo fariin u dir mawduuca:

  1. kabka - ma xannibo sababtoo ah ma filayo natiijo. Uma diri kartid natiijada mawduuc kale fariin ahaan.

  2. dir - ma xannibo sababtoo ah ma filayo natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.

  3. weydii - waxay sugaysaa natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.

Markaa, taasi waa dhammaan wakiilada maanta!

Kooxda riyada

Waxa ugu dambeeya ee aan ballanqaaday inaan qaybtan ku qoro waa amarro. Sidii hore loo sheegay, amarrada ku jira faust waa baakad ku wareegsan gujis. Dhab ahaantii, faust waxay si fudud ugu dhejisaa amarkayaga gaarka ah interface-ka marka la tilmaamayo furaha -A

Ka dib markii wakiilada lagu dhawaaqay in wakiilada.py ku dar shaqo leh qurxin app.commandwac habka tuuro у ururin_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Markaa, haddii aan u yeerno liiska amarrada, amarkayaga cusub ayaa ku jiri doona:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Waxaan u isticmaali karnaa sida qof kasta oo kale, markaa aan dib u bilowno shaqaalaha khaldan oo aan bilowno ururinta dammaanadda oo dhammaystiran:

> faust -A horton.agents start-collect-securities

Maxaa xigi doona?

Qaybta soo socota, annagoo adeegsanayna wakiillada soo hadhay tusaale ahaan, waxaynu tixgelin doonaa habka saxanka ee lagu raadinayo meelaha ugu daran ee qiimaha xidhitaanka ganacsiga ee sanadka iyo furitaanka wakiilada.

Taasi waa dhammaan maanta! Waad ku mahadsan tahay akhrinta :)

Koodhka qaybtan

Hawlaha asalka ah ee Faust, Qaybta II: Wakiilada iyo Kooxaha

PS Qaybta u danbeysa waxa la i waydiiyey wax ku saabsan faust iyo kafka isku dhafan (waa maxay sifooyinka uu leeyahay isku-darka?). Waxay u muuqataa in isku-darka uu si aad ah u shaqeynayo siyaabo badan, laakiin xaqiiqadu waxay tahay in faust uusan haysan taageero buuxda oo macmiilka ah - tani waxay ka socotaa sharaxaada xayiraadaha macmiilka ee dukumeentiga.

Source: www.habr.com

Add a comment