Imisebenzi engemuva ku-Faust, Ingxenye II: Ama-ejenti Namaqembu

Imisebenzi engemuva ku-Faust, Ingxenye II: Ama-ejenti Namaqembu

Uhlu lokuqukethwe

  1. Ingxenye I: Isingeniso

  2. Ingxenye II: Ama-ejenti namaQembu

Senzani lapha?

Ngakho, ngakho, ingxenye yesibili. Njengoba kulotshiwe ngaphambili, kuyo sizokwenza lokhu okulandelayo:

  1. Masibhale iklayenti elincane le-alphavantage ku-aiohttp ngezicelo zamaphoyinti esiwadingayo.

  2. Masidale umenzeli ozoqoqa idatha yezibambiso nolwazi lwe-meta olukuzo.

Kodwa, yilokho esizokwenzela iphrojekthi ngokwayo, futhi mayelana nocwaningo olusheshayo, sizofunda ukuthi singabhala kanjani ama-agent acubungula imicimbi yokusakaza kusuka ku-kafka, kanye nendlela yokubhala imiyalo (chofoza i-wrapper), kithi - ngemiyalezo ephushwayo eyenziwa ngesandla esihlokweni esiqashwe yi-ejenti.

Training

AlphaVantage Client

Okokuqala, masibhale iklayenti elincane le-aiohttp ukuze uthole izicelo zokwenziwa kwe-alphavantage.

i-alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Eqinisweni, konke kucacile kuyo:

  1. I-AlphaVantage API ilula futhi iklanywe kahle, ngakho nginqume ukwenza zonke izicelo ngale ndlela construct_query lapho kukhona khona ucingo lwe-http.

  2. Ngiletha zonke izinkundla snake_case ukuze kube lula.

  3. Hhayi-ke, umhlobiso we-logger.catch ukuze uthole umphumela omuhle futhi ofundisayo we-traceback.

PS Ungakhohlwa ukwengeza ithokheni ye-alphavantage endaweni ku-config.yml, noma ukuthekelisa okuguquguqukayo kwemvelo HORTON_SERVICE_APIKEY. Sithola uphawu lapha.

Isigaba se-CRUD

Sizoba neqoqo lezibambiso ukuze sigcine ulwazi lwe-meta mayelana nezibambiso.

database/security.py

Ngokubona kwami, asikho isidingo sokuchaza noma yini lapha, futhi isigaba sesisekelo ngokwaso silula kakhulu.

thola_uhlelo lokusebenza()

Ake sengeze umsebenzi wokudala into yohlelo lokusebenza kuyo uhlelo lokusebenza.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Okwamanje sizoba nendalo elula yohlelo lokusebenza, ngokuhamba kwesikhathi sizoyandisa, nokho, ukuze ungakugcini ulindile, lapha izinkomba ku-App-class. Ngiphinde ngincome ukuthi ubheke isigaba sezilungiselelo, ngoba sinesibopho sezilungiselelo eziningi.

Umzimba oyinhloko

I-ejenti yokuqoqa nokugcina uhlu lwezibambiso

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ngakho-ke, okokuqala sithola into esebenza kakhulu - ilula kakhulu. Okulandelayo, simemezela ngokucacile isihloko se-ejenti yethu... Lapha kufanelekile ukusho ukuthi iyini, ukuthi iyini ipharamitha yangaphakathi nokuthi lokhu kungahlelwa kanjani ngokuhlukile.

  1. Izihloko ku-kafka, uma sifuna ukwazi incazelo eqondile, kungcono ukufunda icishiwe. idokhumenti, noma ungafunda iqoqo ku-Habré ngesiRashiya, lapho konke kuboniswa khona ngokunembile :)

  2. Ipharamitha yangaphakathi, echazwe kahle ku-faust doc, isivumela ukuthi silungiselele isihloko ngokuqondile kukhodi, yebo, lokhu kusho imingcele enikezwe abathuthukisi abasheshayo, isibonelo: inqubomgomo yokugcina, yokugcina (ngokuzenzakalelayo susa, kodwa ungasetha icwecwe), inani lokuhlukaniswa ngesihloko ngasinye (izikoloukwenza, isibonelo, ngaphansi ukubaluleka komhlaba wonke izicelo fast).

  3. Ngokuvamile, umenzeli angadala isihloko esiphethwe ngamavelu omhlaba, nokho, ngithanda ukumemezela yonke into ngokusobala. Ukwengeza, amanye amapharamitha (isibonelo, inombolo yezingxenye noma inqubomgomo yokugcinwa) yesihloko esikhangisweni somenzeli awakwazi ukumiswa.

    Nakhu ukuthi kungase kubukeke kanjani ngaphandle kokuchaza isihloko mathupha:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Hhayi-ke, manje ake sichaze ukuthi i-ejenti yethu izokwenzani :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ngakho-ke, ekuqaleni kwe-ejenti, sivula iseshini ye-aiohttp ngezicelo ngeklayenti lethu. Ngakho-ke, lapho uqala isisebenzi, lapho i-ejenti yethu yethulwa, iseshini izovulwa ngokushesha - eyodwa, ngaso sonke isikhathi isisebenzi sisebenza (noma eziningana, uma ushintsha ipharamitha. concurrency kusuka kumenzeli oneyunithi ezenzakalelayo).

Okulandelayo, silandela umfudlana (sibeka umlayezo kuwo _, njengoba thina, kulo menzeli, asinandaba nokuqukethwe) kwemilayezo evela esihlokweni sethu, uma ikhona ku-offset yamanje, ngaphandle kwalokho umjikelezo wethu uzolinda ukufika kwayo. Yebo, ngaphakathi kwe-loop yethu, sifaka irisidi yomlayezo, sithole uhlu lwezinto ezisebenzayo (get_securities returns ezisebenza kuphela ngokuzenzakalelayo, bona ikhodi yeklayenti) bese uyigcina ku-database, sibheka ukuthi kukhona yini ukuphepha okunethikha efanayo futhi. exchange ku-database , uma kukhona, khona-ke (iphepha) lizobuyekezwa kalula.

Ake sethule indalo yethu!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Izici ze-PS ingxenye yewebhu Ngeke ngicabangele ukugxeka ezihlokweni, ngakho-ke sibeka ifulegi elifanele.

Emyalweni wethu wokwethula, sitshele u-faust ukuthi uyibheke kuphi into yesicelo nokuthi wenzeni ngayo (qala isisebenzi) ngezinga lokuphuma kwelogi yolwazi. Sithola okukhiphayo okulandelayo:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Iyaphila!!!

Ake sibheke isethi yokuhlukanisa. Njengoba sibona, isihloko sadalwa ngegama esiliqoke kukhodi, inombolo ezenzakalelayo yezingxenye (8, ezithathwe izingxenye_zesihloko - ipharamitha yento yohlelo lokusebenza), ngoba asizange sicacise inani ngalinye lesihloko sethu (nge-partitions). I-ejenti eyethulwe esisebenzini inikezwe zonke izingxenye eziyisi-8, ngoba yiyona yodwa, kodwa lokhu kuzoxoxwa ngakho kabanzi engxenyeni emayelana nokuhlanganisa.

Hhayi-ke, manje sesingaya kwelinye iwindi letheminali futhi sithumele umlayezo ongenalutho esihlokweni sethu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS usebenzisa @ sibonisa ukuthi sithumela umlayezo esihlokweni esibizwa ngokuthi "collect_securities".

Kulokhu, umlayezo uye ku-partition 6 - ungahlola lokhu ngokuya ku-kafdrop on localhost:9000

Ukuya ewindini letheminali nesisebenzi sethu, sizobona umlayezo ojabulisayo othunyelwe kusetshenziswa i-loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Futhi singabheka ku-mongo (sisebenzisa i-Robo3T noma i-Studio3T) futhi sibone ukuthi izibambiso zikusizindalwazi:

Angiyena usozigidi, ngakho-ke sanelisekile ngenketho yokuqala yokubuka.

Imisebenzi engemuva ku-Faust, Ingxenye II: Ama-ejenti NamaqembuImisebenzi engemuva ku-Faust, Ingxenye II: Ama-ejenti Namaqembu

Injabulo nenjabulo - i-ejenti yokuqala isilungile :)

Umenzeli ulungile, phila isikhathi eside umenzeli omusha!

Yebo, madoda, sihlanganise kuphela i-1/3 yendlela elungiselelwe yilesi sihloko, kodwa ungadangali, ngoba manje kuzoba lula.

Ngakho-ke manje sidinga i-ejenti eqoqa imininingwane ye-meta futhi ilufake kudokhumenti yeqoqo:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Njengoba lo menzeli ezocubungula ulwazi mayelana nokuvikeleka okuthile, sidinga ukukhombisa ithikha (uphawu) lwalesi sivikelo kumlayezo. Ngale njongo ku-faust kukhona amarekhodi - amakilasi amemezela uhlelo lomlayezo esihlokweni se-ejenti.

Kulokhu, ake siye ku amarekhodi.py futhi uchaze ukuthi umlayezo walesi sihloko kufanele ubukeke kanjani:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Njengoba ungase uqagele, i-faust isebenzisa isichasiselo sohlobo lwe-python ukuchaza i-schema yomlayezo, yingakho inguqulo encane esekelwa umtapo wezincwadi 3.6.

Ake sibuyele kumenzeli, setha izinhlobo bese sengeza:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Njengoba ubona, sidlulisela ipharamitha entsha ngohlelo endleleni yokuqalisa isihloko - value_type. Ngaphezu kwalokho, yonke into ilandela uhlelo olufanayo, ngakho-ke angiboni iphuzu lokuhlala kunoma yini enye.

Nokho, ukuthinta kokugcina ukwengeza ucingo kumenzeli wokuqoqa imininingwane ye-meta ukuqoqa_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Sisebenzisa uhlelo olwamenyezelwa ngaphambilini lomlayezo. Kulokhu, ngisebenzise indlela ethi .cast njengoba kungadingeki silinde umphumela ovela kumenzeli, kodwa kufanele sikusho lokho. izindlela thumela umlayezo esihlokweni:

  1. i-cast - ayivimbeli ngoba ayilindele umphumela. Awukwazi ukuthumela umphumela kwesinye isihloko njengomlayezo.

  2. thumela - akuvimbi ngoba ayilindele umphumela. Ungacacisa i-ejenti esihlokweni lapho umphumela uzoya khona.

  3. buza - ulinde umphumela. Ungacacisa i-ejenti esihlokweni lapho umphumela uzoya khona.

Ngakho-ke, yilokho kuphela ngama-ejenti anamuhla!

Ithimba lamaphupho

Into yokugcina engathembisa ukuyibhala kule ngxenye imiyalo. Njengoba kushiwo ngaphambili, imiyalo ku-faust iwukugoqa ukuchofoza. Eqinisweni, i-faust imane inamathisele umyalo wethu wangokwezifiso kusixhumi esibonakalayo lapho icacisa ukhiye -A

Ngemuva kokuthi ama-ejenti amenyezelwe ama-agent.py engeza umsebenzi nomhlobisi app.commandukubiza indlela waphonsa у qoqa_izibambiso:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Ngakho-ke, uma sibiza uhlu lwemiyalo, umyalo wethu omusha uzoba kuwo:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Singayisebenzisa njenganoma ubani omunye, ngakho-ke ake siqale kabusha isisebenzi esingasebenzi kahle futhi siqale iqoqo eligcwele lezibambiso:

> faust -A horton.agents start-collect-securities

Kuzokwenzekani ngokulandelayo?

Engxenyeni elandelayo, sisebenzisa ama-ejenti asele njengesibonelo, sizobheka indlela kasinki yokufuna ukweqisa emananini okuvala okuhweba onyaka kanye nokwethulwa kwe-cron kwama-ejenti.

Yilokho kuphela okwanamuhla! Siyabonga ngokufunda :)

Ikhodi yale ngxenye

Imisebenzi engemuva ku-Faust, Ingxenye II: Ama-ejenti Namaqembu

PS Ngaphansi kwengxenye yokugcina ngabuzwa mayelana ne-faust ne-confluent kafka (iziphi izici i-confluent inazo?). Kubonakala sengathi i-confluent isebenza ngezindlela eziningi, kodwa iqiniso liwukuthi i-faust ayinakho ukusekelwa okugcwele kwamaklayenti ku-confluent - lokhu kulandela kusukela izincazelo zemikhawulo yeklayenti kudokhumenti.

Source: www.habr.com

Engeza amazwana