Faust üzrə əsas tapşırıqlar, II hissə: Agentlər və Komandalar

Faust üzrə əsas tapşırıqlar, II hissə: Agentlər və Komandalar

Mündəricat

  1. I hissə: Giriş

  2. II hissə: Agentlər və Komandalar

Bizim burada nə işimiz var?

Beləliklə, ikinci hissə. Daha əvvəl yazıldığı kimi, onda aşağıdakıları edəcəyik:

  1. Bizə lazım olan son nöqtələr üçün sorğularla aiohttp-də alfavantage üçün kiçik bir müştəri yazaq.

  2. Qiymətli kağızlar haqqında məlumat və onlar haqqında meta məlumat toplayacaq agent yaradaq.

Ancaq bu, layihənin özü üçün edəcəyimiz şeydir və faust araşdırması baxımından, biz kafkadan axın hadisələrini emal edən agentləri necə yazmağı, həmçinin əmrləri necə yazmağı (klik sarğı) öyrənəcəyik, bizim vəziyyətimizdə - agentin nəzarət etdiyi mövzuya manuel təkan mesajları üçün.

Təlim

AlphaVantage Müştəri

Əvvəlcə alfavantage sorğuları üçün kiçik bir aiohttp müştəri yazaq.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Əslində ondan hər şey aydındır:

  1. AlphaVantage API olduqca sadə və gözəl dizayn edilmişdir, ona görə də bütün sorğuları metod vasitəsilə etmək qərarına gəldim construct_query burada öz növbəsində http çağırışı var.

  2. Mən bütün sahələri gətirirəm snake_case rahatlıq üçün.

  3. Yaxşı, gözəl və informativ izləmə çıxışı üçün logger.catch dekorasiyası.

PS Alfavantage işarəsini yerli olaraq config.yml-ə əlavə etməyi və ya mühit dəyişənini ixrac etməyi unutmayın. HORTON_SERVICE_APIKEY. Token alırıq burada.

CRUD sinfi

Qiymətli kağızlar haqqında meta məlumat saxlamaq üçün qiymətli kağızlar kolleksiyamız olacaq.

verilənlər bazası/security.py

Fikrimcə, burada heç nə izah etməyə ehtiyac yoxdur və baza sinfinin özü olduqca sadədir.

get_app()

Tətbiq obyekti yaratmaq üçün funksiya əlavə edək app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Hələlik bizdə ən sadə proqram yaradılması olacaq, bir az sonra onu genişləndirəcəyik, lakin sizi gözləməmək üçün burada istinadlar Tətbiq sinfinə. Mən də sizə parametrlər sinfinə nəzər salmağı məsləhət görürəm, çünki o, əksər parametrlərə cavabdehdir.

Əsas hissəsidir

Qiymətli kağızların siyahısının toplanması və aparılması üzrə agent

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Beləliklə, əvvəlcə faust tətbiq obyektini alırıq - bu olduqca sadədir. Sonra, biz agentimiz üçün açıq şəkildə bir mövzu elan edirik... Burada onun nə olduğunu, daxili parametrin nə olduğunu və bunun necə fərqli şəkildə təşkil oluna biləcəyini qeyd etməyə dəyər.

  1. Kafkadakı mövzular, dəqiq tərifini bilmək istəyiriksə, oxumaq daha yaxşıdır off. sənədvə ya oxuya bilərsiniz məcmuə Rus dilində Habré-də, burada hər şey olduqca dəqiq əks olunur :)

  2. Daxili parametr, faust doc-da olduqca yaxşı təsvir edilmişdir, mövzunu birbaşa kodda konfiqurasiya etməyə imkan verir, əlbəttə ki, bu, faust tərtibatçıları tərəfindən verilən parametrlər deməkdir, məsələn: saxlama, saxlama siyasəti (standart olaraq silmək, ancaq təyin edə bilərsiniz. yığcam), mövzu üzrə bölmələrin sayı (puanlarıetmək, məsələn, az etmək qlobal əhəmiyyəti proqramlar faust).

  3. Ümumiyyətlə, agent qlobal dəyərlərlə idarə olunan mövzu yarada bilər, lakin mən hər şeyi açıq şəkildə bəyan etməyi xoşlayıram. Bundan əlavə, agent reklamındakı mövzunun bəzi parametrləri (məsələn, bölmələrin sayı və ya saxlama siyasəti) konfiqurasiya edilə bilməz.

    Mövzunu əl ilə təyin etmədən belə görünə bilər:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Yaxşı, indi agentimizin nə edəcəyini təsvir edək :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Beləliklə, agentin başlanğıcında müştərimiz vasitəsilə sorğular üçün aiohttp sessiyası açırıq. Beləliklə, bir işçi işə başladıqda, agentimiz işə salındıqda dərhal bir sessiya açılacaq - bir, işçinin işlədiyi bütün müddət üçün (və ya parametri dəyişdirsəniz, bir neçə). uyğunluq standart vahidi olan agentdən).

Sonra, axını izləyirik (mesajı yerləşdiririk _, çünki biz bu agentdə mövzumuzdan gələn mesajların məzmununa əhəmiyyət vermədiyimiz üçün, əgər onlar cari ofsetdə varsa, əks halda dövrümüz onların gəlişini gözləyəcək. Yaxşı, döngəmizdə mesajın alınmasını qeyd edirik, aktiv (get_securities yalnız default olaraq aktivdir, müştəri koduna baxın) qiymətli kağızların siyahısını əldə edirik və eyni ticker və qiymətli kağızın olub olmadığını yoxlayaraq verilənlər bazasında saxlayırıq. verilənlər bazasında mübadilə , əgər varsa, o (kağız) sadəcə yenilənəcəkdir.

Yaradıcılığımıza başlayaq!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Xüsusiyyətləri veb komponenti Məqalələrdə faustu nəzərdən keçirməyəcəyəm, ona görə də müvafiq bayrağı qoyduq.

Başlatma əmrimizdə biz faust-a məlumat jurnalının çıxış səviyyəsi ilə tətbiq obyektini harada axtarmaq və onunla nə etmək lazım olduğunu (işçini işə salmaq) söylədik. Aşağıdakı çıxışı alırıq:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

O canlıdır!!!

Bölmə dəstinə baxaq. Gördüyümüz kimi, kodda təyin etdiyimiz adla, standart bölmələrin sayı (8, götürülmüşdür) ilə bir mövzu yaradıldı. mövzu_bölmələri - tətbiq obyekti parametri), mövzumuz üçün fərdi dəyər təyin etmədiyimiz üçün (arakəsmələr vasitəsilə). İşçidə işə salınan agentə bütün 8 arakəsmə təyin olunur, çünki bu, yeganədir, lakin bu, klasterləşmə ilə bağlı hissədə daha ətraflı müzakirə olunacaq.

Yaxşı, indi başqa bir terminal pəncərəsinə keçib mövzumuza boş bir mesaj göndərə bilərik:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS istifadə edir @ biz “qiymətli kağızları toplamaq” adlı mövzuya mesaj göndərdiyimizi göstəririk.

Bu halda mesaj 6-cı bölməyə getdi - kafdrop-a keçərək bunu yoxlaya bilərsiniz localhost:9000

İşçimizlə terminal pəncərəsinə gedərkən loguru istifadə edərək göndərilən xoşbəxt mesajı görəcəyik:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Biz həmçinin mongo-ya (Robo3T və ya Studio3T istifadə edərək) baxa bilərik və qiymətli kağızların verilənlər bazasında olduğunu görə bilərik:

Mən milyarder deyiləm və buna görə də birinci baxış seçimi ilə kifayətlənirik.

Faust üzrə əsas tapşırıqlar, II hissə: Agentlər və KomandalarFaust üzrə əsas tapşırıqlar, II hissə: Agentlər və Komandalar

Xoşbəxtlik və sevinc - ilk agent hazırdır :)

Agent hazır, yaşasın yeni agent!

Bəli, cənablar, biz bu məqalənin hazırladığı yolun yalnız 1/3-ni keçdik, amma ruhdan düşməyin, çünki indi daha asan olacaq.

Beləliklə, indi bizə meta məlumat toplayan və onu kolleksiya sənədinə qoyan agent lazımdır:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Bu agent konkret qiymətli kağız haqqında məlumatları emal edəcəyi üçün mesajda bu qiymətli kağızın işarəsini (simvolunu) göstərməliyik. Bu məqsədlə faustda var Records — agent mövzusunda mesaj sxemini elan edən siniflər.

Bu vəziyyətdə, gedək qeydlər.py və bu mövzu üçün mesajın necə görünəcəyini təsvir edin:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Təxmin etdiyiniz kimi, faust mesaj sxemini təsvir etmək üçün python tipli annotasiyadan istifadə edir, buna görə də kitabxana tərəfindən dəstəklənən minimum versiya 3.6.

Agentə qayıdaq, növləri təyin edək və əlavə edək:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Gördüyünüz kimi, mövzunun başlatma metoduna sxem ilə yeni bir parametr keçirik - dəyər_tipi. Bundan əlavə, hər şey eyni sxemə uyğundur, buna görə də başqa bir şey üzərində dayanmağın mənasını görmürəm.

Yaxşı, son toxunuş, collect_securitites üçün meta məlumat toplama agentinə zəng əlavə etməkdir:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Mesaj üçün əvvəlcədən elan edilmiş sxemdən istifadə edirik. Bu halda mən .cast metodundan istifadə etdim, çünki agentdən nəticə gözləməyə ehtiyac yoxdur, lakin qeyd etmək lazımdır ki, yolları mövzuya mesaj göndərin:

  1. tökmə - nəticə gözləmədiyi üçün blok etmir. Nəticəni başqa mövzuya mesaj olaraq göndərə bilməzsiniz.

  2. göndər - nəticə gözləmədiyi üçün blok etmir. Nəticənin gedəcəyi mövzuda agent təyin edə bilərsiniz.

  3. soruşmaq - nəticəni gözləyir. Nəticənin gedəcəyi mövzuda agent təyin edə bilərsiniz.

Beləliklə, bu gün agentlərlə olan hər şey budur!

Xəyal Komandası

Bu hissədə yazmağa söz verdiyim son şey əmrlərdir. Daha əvvəl qeyd edildiyi kimi, faustdakı əmrlər klik ətrafında bir sarğıdır. Əslində, faust sadəcə olaraq -A düyməsini təyin edərkən bizim xüsusi əmrimizi onun interfeysinə əlavə edir

Agentlər içəri girdikdən sonra elan edildi agents.py dekorator ilə funksiya əlavə edin app.commandmetodu çağırır tökmə у təhlükəsizlikləri_toplayın:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Beləliklə, əmrlərin siyahısını çağırsaq, yeni əmrimiz onda olacaq:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Biz də hər kəs kimi istifadə edə bilərik, ona görə də gəlin faust işçisini yenidən işə salaq və qiymətli kağızların tam hüquqlu kolleksiyasına başlayaq:

> faust -A horton.agents start-collect-securities

Bundan sonra nə olacaq?

Növbəti hissədə, qalan agentlərdən nümunə kimi istifadə edərək, il üçün ticarətin bağlanış qiymətlərində ifratların axtarışı üçün sink mexanizmini və agentlərin cron işə salınmasını nəzərdən keçirəcəyik.

Bu gün üçün hamısı budur! Oxuduğunuz üçün təşəkkürlər :)

Bu hissə üçün kod

Faust üzrə əsas tapşırıqlar, II hissə: Agentlər və Komandalar

P.S. Son hissənin altında məndən faust və birləşən kafka haqqında soruşdular (konfluent hansı xüsusiyyətlərə malikdir?). Görünür, birləşən bir çox cəhətdən daha funksionaldır, amma fakt budur ki, faust birləşən üçün tam müştəri dəstəyinə malik deyil - bundan belə çıxır. sənəddə müştəri məhdudiyyətlərinin təsviri.

Mənbə: www.habr.com

Добавить комментарий