Faust bo'yicha asosiy vazifalar, II qism: Agentlar va jamoalar

Faust bo'yicha asosiy vazifalar, II qism: Agentlar va jamoalar

Mundarija

  1. I qism: Kirish

  2. II qism: Agentlar va jamoalar

Bu yerda nima qilyapmiz?

Shunday qilib, ikkinchi qism. Yuqorida yozilganidek, unda biz quyidagilarni qilamiz:

  1. Bizga kerakli so'nggi nuqtalar uchun so'rovlar bilan aiohttp-da alfavantage uchun kichik mijoz yozamiz.

  2. Keling, qimmatli qog'ozlar va ular bo'yicha meta-ma'lumotlar to'g'risidagi ma'lumotlarni to'playdigan agent yarataylik.

Ammo, biz loyihaning o'zi uchun nima qilamiz va faust tadqiqoti nuqtai nazaridan, biz kafkadan oqim hodisalarini qayta ishlaydigan agentlarni qanday yozishni, shuningdek buyruqlarni qanday yozishni (o'rashni bosish), bizning holatlarimizda - agent kuzatayotgan mavzuga qo'lda surish xabarlari uchun.

o'quv

AlphaVantage mijozi

Birinchidan, alfavantga so'rovlar uchun kichik aiohttp mijozini yozamiz.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Aslida, undan hamma narsa aniq:

  1. AlphaVantage API juda sodda va chiroyli tarzda ishlab chiqilgan, shuning uchun men barcha so'rovlarni ushbu usul orqali amalga oshirishga qaror qildim construct_query bu erda o'z navbatida http chaqiruvi mavjud.

  2. Men barcha dalalarni olib kelaman snake_case qulaylik uchun.

  3. Xo'sh, chiroyli va ma'lumot beruvchi traceback chiqishi uchun logger.catch bezak.

P.S. Alfavantage tokenini mahalliy ravishda config.yml ga qo'shishni yoki muhit o'zgaruvchisini eksport qilishni unutmang. HORTON_SERVICE_APIKEY. Biz token olamiz shu yerda.

CRUD klassi

Qimmatli qog'ozlar haqidagi meta-ma'lumotni saqlash uchun qimmatli qog'ozlar to'plamiga ega bo'lamiz.

ma'lumotlar bazasi/security.py

Menimcha, bu erda hech narsani tushuntirishning hojati yo'q va asosiy sinfning o'zi juda oddiy.

get_app()

Ilova obyektini yaratish funksiyasini qo'shamiz app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Hozircha bizda eng oddiy dastur yaratish bo'ladi, birozdan keyin uni kengaytiramiz, ammo sizni kuttirmaslik uchun bu erda havolalar Ilova sinfiga. Shuningdek, sozlamalar sinfini ko'rib chiqishni maslahat beraman, chunki u ko'pgina sozlamalar uchun javobgardir.

Asosiy qism

Qimmatli qog'ozlar ro'yxatini yig'ish va yuritish bo'yicha agent

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Shunday qilib, birinchi navbatda biz faust dastur ob'ektini olamiz - bu juda oddiy. Keyinchalik, biz agentimiz uchun mavzuni aniq e'lon qilamiz ... Bu erda nima ekanligini, ichki parametr nima ekanligini va buni qanday qilib boshqacha tartibga solish mumkinligini eslatib o'tish kerak.

  1. Kafkadagi mavzular, agar aniq ta'rifni bilmoqchi bo'lsak, o'qiganimiz ma'qul o'chirilgan. hujjat, yoki siz o'qishingiz mumkin kompendium Habré-da rus tilida, u erda hamma narsa juda aniq aks ettirilgan :)

  2. Ichki parametr, faust doc-da juda yaxshi tasvirlangan, mavzuni to'g'ridan-to'g'ri kodda sozlash imkonini beradi, albatta, bu faust ishlab chiquvchilari tomonidan taqdim etilgan parametrlarni anglatadi, masalan: saqlash, saqlash siyosati (odatda o'chirish, lekin siz o'rnatishingiz mumkin) zich), mavzu bo'yicha bo'limlar soni (qismlarmasalan, kamroq qilish global ahamiyatga ega ilovalar faust).

  3. Umuman olganda, agent global qadriyatlar bilan boshqariladigan mavzuni yaratishi mumkin, ammo men hamma narsani aniq e'lon qilishni yaxshi ko'raman. Bundan tashqari, agent reklamasidagi mavzuning ba'zi parametrlarini (masalan, bo'limlar soni yoki saqlash siyosati) sozlab bo'lmaydi.

    Mavzuni qo'lda belgilamasdan turib u qanday ko'rinishi mumkin:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Xo'sh, endi agentimiz nima qilishini tasvirlab beraylik :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Shunday qilib, agentning boshida biz mijozimiz orqali so'rovlar uchun aiohttp sessiyasini ochamiz. Shunday qilib, ishchini ishga tushirganda, bizning agentimiz ishga tushirilganda, darhol sessiya ochiladi - bitta, ishchi butun vaqt davomida (yoki bir nechta, agar siz parametrni o'zgartirsangiz). bir vaqtda standart birlikka ega agentdan).

Keyinchalik, biz oqimni kuzatib boramiz (biz xabarni joylashtiramiz _, chunki biz ushbu agentda mavzuimizdagi xabarlarning mazmuni haqida qayg'urmaymiz, agar ular joriy ofsetda mavjud bo'lsa, aks holda bizning tsiklimiz ularning kelishini kutadi. Xo'sh, bizning tsiklimiz ichida biz xabarni qabul qilishni qayd qilamiz, faol (get_securities faqat sukut bo'yicha faol bo'ladi, mijoz kodini ko'ring) ro'yxatini olamiz va uni ma'lumotlar bazasiga saqlaymiz, xuddi shu ticker bilan xavfsizlik mavjudligini tekshiramiz va ma'lumotlar bazasida almashinuv , agar mavjud bo'lsa, u (qog'oz) shunchaki yangilanadi.

Keling, ijodimizni boshlaymiz!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

P.S. Imkoniyatlar veb komponenti Men maqolalarda faustni ko'rib chiqmayman, shuning uchun biz tegishli bayroqni o'rnatdik.

Ishga tushirish buyrug'imizda biz faustga dastur ob'ektini qayerdan qidirishni va u bilan nima qilish kerakligini (ishchini ishga tushirish) ma'lumotlar jurnalining chiqish darajasi bilan aytdik. Biz quyidagi natijani olamiz:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Bu tirik !!!

Keling, bo'limlar to'plamini ko'rib chiqaylik. Ko'rib turganimizdek, mavzu biz kodda belgilagan nom bilan yaratilgan, standart bo'limlar soni (8, dan olingan). mavzu_bo'limlari - dastur ob'ekti parametri), chunki biz mavzuimiz uchun individual qiymatni belgilamadik (bo'limlar orqali). Ishchida ishga tushirilgan agentga barcha 8 ta bo'lim tayinlangan, chunki u yagona, ammo bu klasterlash bo'limida batafsilroq muhokama qilinadi.

Xo'sh, endi biz boshqa terminal oynasiga o'tishimiz va mavzuimizga bo'sh xabar yuborishimiz mumkin:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. yordamida @ biz "to'plash_qimmatli qog'ozlar" nomli mavzuga xabar yuborayotganimizni ko'rsatamiz.

Bunday holda, xabar 6-bo'limga o'tdi - buni kafdrop-ga o'tish orqali tekshirishingiz mumkin localhost:9000

Ishchimiz bilan terminal oynasiga borib, loguru yordamida yuborilgan baxtli xabarni ko'ramiz:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Shuningdek, biz mongo-ga (Robo3T yoki Studio3T-dan foydalanib) qarashimiz va qimmatli qog'ozlar ma'lumotlar bazasida ekanligini ko'rishimiz mumkin:

Men milliarder emasman, shuning uchun biz birinchi ko'rish variantidan mamnunmiz.

Faust bo'yicha asosiy vazifalar, II qism: Agentlar va jamoalarFaust bo'yicha asosiy vazifalar, II qism: Agentlar va jamoalar

Baxt va quvonch - birinchi agent tayyor :)

Agent tayyor, yangi agent yashasin!

Ha, janoblar, biz ushbu maqola tomonidan tayyorlangan yo'lning atigi 1/3 qismini bosib o'tdik, ammo tushkunlikka tushmang, chunki endi bu osonroq bo'ladi.

Endi bizga meta-ma'lumotni to'playdigan va uni yig'ish hujjatiga joylashtiradigan agent kerak:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ushbu agent ma'lum bir qimmatli qog'oz haqidagi ma'lumotlarni qayta ishlaganligi sababli, biz xabarda ushbu qimmatli qog'ozning belgisini (ramzini) ko'rsatishimiz kerak. Buning uchun faustda mavjud Records — agent mavzusida xabarlar sxemasini e'lon qiluvchi sinflar.

Bunday holda, keling records.py va ushbu mavzu uchun xabar qanday ko'rinishini tasvirlab bering:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Siz taxmin qilganingizdek, faust xabar sxemasini tavsiflash uchun python tipidagi izohdan foydalanadi, shuning uchun kutubxona tomonidan qo'llab-quvvatlanadigan minimal versiya. 3.6.

Keling, agentga qaytaylik, turlarni o'rnatamiz va uni qo'shamiz:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ko'rib turganingizdek, mavzuni ishga tushirish usuliga sxema bilan yangi parametrni o'tkazamiz - value_type. Bundan tashqari, hamma narsa bir xil sxema bo'yicha amalga oshiriladi, shuning uchun men boshqa narsa haqida o'ylashning ma'nosini ko'rmayapman.

Yakuniy teginish - bu collect_securitites uchun meta-ma'lumot yig'ish agentiga qo'ng'iroq qilish:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Biz xabar uchun avval e'lon qilingan sxemadan foydalanamiz. Bu holatda men .cast usulidan foydalandim, chunki agentdan natija kutishimiz shart emas, lekin shuni ta'kidlash joizki yo'llari mavzuga xabar yuboring:

  1. quyma - bloklanmaydi, chunki u natija kutmaydi. Natijani boshqa mavzuga xabar sifatida yubora olmaysiz.

  2. yuborish - bloklanmaydi, chunki u natija kutmaydi. Natija ketadigan mavzuda agentni belgilashingiz mumkin.

  3. so'rang - natijani kutadi. Natija ketadigan mavzuda agentni belgilashingiz mumkin.

Demak, agentlar bilan bugun hammasi!

Orzular jamoasi

Bu qismda yozishga va'da bergan oxirgi narsa - bu buyruqlar. Yuqorida aytib o'tilganidek, faust-dagi buyruqlar bosish atrofida o'ralgan. Haqiqatan ham, faust -A tugmachasini ko'rsatganda, bizning maxsus buyruqimizni o'z interfeysiga biriktiradi

E'lon qilingan agentlardan keyin agents.py dekorator bilan funksiya qo'shing app.commandusulni chaqirish quyma у yig'ish_xavfsizliklari:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Shunday qilib, agar biz buyruqlar ro'yxatini chaqirsak, unda bizning yangi buyruqimiz bo'ladi:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Biz uni boshqalar kabi ishlatishimiz mumkin, shuning uchun faust ishchisini qayta ishga tushiramiz va qimmatli qog'ozlarning to'liq to'plamini boshlaymiz:

> faust -A horton.agents start-collect-securities

Keyingi nima bo'ladi?

Keyingi qismda, qolgan agentlardan misol sifatida foydalanib, biz yil davomida savdoning yopilish narxlarida ekstremallarni qidirish mexanizmini va agentlarni ishga tushirishni ko'rib chiqamiz.

Bugun hammasi shu! O'qiganingiz uchun rahmat :)

Ushbu qism uchun kod

Faust bo'yicha asosiy vazifalar, II qism: Agentlar va jamoalar

P.S. Oxirgi qism ostida mendan faust va qo'shilgan kafka haqida so'rashdi (konfluent qanday xususiyatlarga ega?). Ko'rinishidan, konfluent ko'p jihatdan ko'proq funktsionaldir, ammo haqiqat shundaki, faust konfluent uchun mijozlarni to'liq qo'llab-quvvatlamaydi - bu shundan kelib chiqadi. hujjatdagi mijoz cheklovlarining tavsifi.

Manba: www.habr.com

a Izoh qo'shish