Фауст, II хэсэг: Агентууд ба багуудын талаархи үндсэн даалгавар

Фауст, II хэсэг: Агентууд ба багуудын талаархи үндсэн даалгавар

Агуулга

  1. I хэсэг: Танилцуулга

  2. II хэсэг: Агентууд ба багууд

Бид энд юу хийж байгаа юм бэ?

Тиймээс, хоёр дахь хэсэг. Өмнө нь бичсэнчлэн, үүнд бид дараахь зүйлийг хийх болно.

  1. Бидэнд хэрэгтэй төгсгөлийн цэгүүдийн хүсэлтийг aiohttp дээр alphavantage-д зориулсан жижиг клиент бичье.

  2. Үнэт цаасны талаарх мэдээлэл, тэдгээрийн мета мэдээлэл цуглуулах агент байгуулъя.

Гэхдээ энэ бол төслийн хувьд бид хийх зүйл бөгөөд фауст судалгааны хувьд бид кафкагаас урсгалын үйл явдлуудыг боловсруулдаг агентуудыг хэрхэн бичих, түүнчлэн командуудыг хэрхэн бичих талаар сурах болно (боодол дээр дарна уу), манай тохиолдолд - агентын хянаж буй сэдэв рүү гар аргаар түлхэх мессежийн хувьд.

Сургалт

AlphaVantage үйлчлүүлэгч

Эхлээд alphavantage-ийн хүсэлтэд зориулж жижиг aiohttp клиент бичье.

alphavantage.py

Хорлон сүйтгэгчид

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Үнэндээ бүх зүйл үүнээс тодорхой байна:

  1. AlphaVantage API нь маш энгийн бөгөөд гоёмсог дизайнтай тул би бүх хүсэлтийг энэ аргаар хийхээр шийдсэн construct_query хаана нь эргээд http дуудлага байна.

  2. Би бүх талбайг авчирдаг snake_case тав тухтай байдлын төлөө.

  3. За, сайхан, мэдээлэл сайтай traceback гаралт нь logger.catch чимэглэл.

Жич: config.yml-д альфавантын токеныг дотооддоо нэмэх эсвэл орчны хувьсагчийг экспортлохоо бүү мартаарай. HORTON_SERVICE_APIKEY. Бид тэмдэг хүлээн авдаг энд.

CRUD анги

Бид үнэт цаасны талаарх мета мэдээллийг хадгалах үнэт цаасны цуглуулгатай болно.

мэдээллийн сан/security.py

Миний бодлоор энд юу ч тайлбарлах шаардлагагүй бөгөөд үндсэн анги нь өөрөө маш энгийн.

авах_app()

Програмын объект үүсгэх функцийг нэмье app.py

Хорлон сүйтгэгчид

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Одоохондоо бид хамгийн энгийн програм үүсгэх болно, дараа нь бид үүнийг өргөжүүлэх болно, гэхдээ таныг хүлээхгүйн тулд энд лавлагаа Апп-ангилал руу. Ихэнх тохиргоог хариуцдаг тул тохиргооны ангиудыг харахыг би танд зөвлөж байна.

Үндсэн хэсэг

Үнэт цаасны жагсаалтыг цуглуулах, хөтлөх агент

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Тиймээс эхлээд бид faust програмын объектыг олж авдаг - энэ нь маш энгийн. Дараа нь бид төлөөлөгчдөө зориулсан сэдвийг тодорхой зарлаж байна ... Энд энэ нь юу болох, дотоод параметр нь юу болох, үүнийг хэрхэн өөрөөр зохион байгуулж болохыг дурдах нь зүйтэй юм.

  1. Кафка дахь сэдвүүд, хэрэв бид яг тодорхой тодорхойлолтыг мэдэхийг хүсвэл уншсан нь дээр унтраах. баримт бичиг, эсвэл та уншиж болно эмхэтгэл Хабре дээр орос хэл дээр, тэнд бүх зүйл маш нарийн тусгагдсан байдаг :)

  2. Дотоод параметр, faust doc-д маш сайн тайлбарласан нь бидэнд сэдвийг кодонд шууд тохируулах боломжийг олгодог, мэдээжийн хэрэг, энэ нь faust хөгжүүлэгчдийн өгсөн параметрүүдийг хэлнэ, жишээлбэл: хадгалах, хадгалах бодлого (анхдагчаар устгах, гэхдээ та тохируулж болно) компакт), сэдэв бүрийн хуваалтын тоо (хуваалтуудхийх, жишээ нь, түүнээс бага дэлхийн ач холбогдол програмууд faust).

  3. Ерөнхийдөө агент нь дэлхийн үнэ цэнэ бүхий удирддаг сэдвийг бий болгож чадна, гэхдээ би бүх зүйлийг тодорхой тунхаглах дуртай. Нэмж дурдахад, агентын сурталчилгааны сэдвийн зарим параметрүүдийг (жишээлбэл, хуваалтын тоо эсвэл хадгалах бодлого) тохируулах боломжгүй.

    Сэдвийг гараар тодорхойлохгүйгээр иймэрхүү харагдах болно:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

За, одоо манай агент юу хийхийг тайлбарлая :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Тиймээс, агентын эхэнд бид үйлчлүүлэгчээрээ дамжуулан хүсэлт гаргах aiohttp сессийг нээдэг. Тиймээс, ажилтныг эхлүүлэх үед манай агент нээгдэхэд тэр даруй сесс нээгдэнэ - нэг нь тухайн ажилчин ажиллаж байгаа бүх хугацаанд (эсвэл хэд хэдэн, хэрэв та параметрийг өөрчилбөл). нийцэл анхдагч нэгжтэй агентаас).

Дараа нь бид урсгалыг дагадаг (бид мессежийг байрлуулна _, учир нь бид энэ агентийн хувьд сэдэвийнхээ мессежүүдийн агуулгыг анхаарч үздэггүй, хэрэв тэдгээр нь одоогийн офсет дээр байгаа бол, эс тэгвээс бидний мөчлөг тэдний ирэлтийг хүлээх болно. За, манай гогцоонд бид мессеж хүлээн авснаа бүртгэж, идэвхтэй (get_securities нь зөвхөн өгөгдмөлөөр идэвхтэй буцдаг, үйлчлүүлэгчийн кодыг харна уу) үнэт цаасны жагсаалтыг авч, мэдээллийн санд хадгалдаг, ижил тэмдэгтэй үнэт цаас байгаа эсэхийг шалгадаг. мэдээллийн санд солилцох , хэрэв байгаа бол энэ нь (цаасан) зүгээр л шинэчлэгдэх болно.

Бүтээлээ эхлүүлцгээе!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS онцлогууд вэб бүрэлдэхүүн хэсэг Би нийтлэлүүдэд фаустыг авч үзэхгүй, тиймээс бид зохих тугийг тавьсан.

Бидний эхлүүлэх команд дээр бид faust-д програмын объектыг хаанаас хайх, түүнтэй юу хийхийг (ажилчин ажиллуулах) мэдээллийн бүртгэлийн гаралтын түвшинд зааж өгсөн. Бид дараах гаралтыг авна.

Хорлон сүйтгэгчид

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Энэ амьд!!!

Хуваалтын багцыг харцгаая. Бидний харж байгаагаар кодонд заасан нэрээр сэдэв үүсгэгдсэн, хуваалтын үндсэн тоо (8, сэдвийн_хуваалтууд - програмын объектын параметр), учир нь бид сэдэвийнхээ бие даасан утгыг заагаагүй (хуваалтуудаар). Ажилчинд эхлүүлсэн агент нь бүх 8 хуваалтыг хуваарилдаг, учир нь энэ нь цорын ганц бөгөөд үүнийг кластерын тухай хэсэгт илүү дэлгэрэнгүй авч үзэх болно.

За, одоо бид өөр терминалын цонх руу орж, сэдэв рүүгээ хоосон мессеж илгээх боломжтой.

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

Жич ашиглаж байна @ Бид "үнэт цаас цуглуулах" сэдэвт мессеж илгээж байгаагаа харуулж байна.

Энэ тохиолдолд мессеж 6-р хуваалт руу очсон - та үүнийг kafdrop дээр очиж шалгаж болно localhost:9000

Ажилчинтайгаа хамт терминалын цонх руу очиход бид loguru ашиглан илгээсэн баяртай мессежийг харах болно.

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Мөн бид mongo-г (Robo3T эсвэл Studio3T ашиглан) үзэж, үнэт цааснууд нь мэдээллийн санд байгааг харж болно:

Би тэрбумтан биш, тиймээс бид анхны үзэх сонголтдоо сэтгэл хангалуун байна.

Фауст, II хэсэг: Агентууд ба багуудын талаархи үндсэн даалгаварФауст, II хэсэг: Агентууд ба багуудын талаархи үндсэн даалгавар

Аз жаргал, баяр баясгалан - анхны агент бэлэн боллоо :)

Агент бэлэн байна, шинэ агент урт наслаарай!

Тийм ээ, ноёд оо, бид энэ өгүүллээр бэлтгэсэн замын 1/3-ийг л даван туулсан, гэхдээ бүү шантар, учир нь одоо энэ нь илүү хялбар байх болно.

Одоо бидэнд мета мэдээлэл цуглуулж цуглуулах баримт бичигт оруулах агент хэрэгтэй байна.

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Энэ агент нь тодорхой үнэт цаасны талаарх мэдээллийг боловсруулах тул бид зурваст энэ үнэт цаасны тэмдэглэгээг (тэмдэг) зааж өгөх шаардлагатай. Энэ зорилгоор Faust-д байдаг Records — агентийн сэдэв дэх мессежийн схемийг зарладаг ангиуд.

Энэ тохиолдолд бид тийшээ явцгаая records.py мөн энэ сэдвийн мессеж ямар байх ёстойг тайлбарлана уу:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Таны таамаглаж байсанчлан faust нь мессежийн схемийг тайлбарлахдаа python төрлийн тэмдэглэгээг ашигладаг тул номын сангийн дэмждэг хамгийн бага хувилбар нь 3.6.

Агент руу буцаж очоод төрлүүдийг тохируулаад нэмнэ үү:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Таны харж байгаагаар бид сэдвийг эхлүүлэх аргад схем бүхий шинэ параметрийг дамжуулдаг - үнэ цэнэ_төрөл. Цаашилбал, бүх зүйл ижил схемийн дагуу явагддаг тул би өөр зүйлд анхаарлаа хандуулах нь утгагүй юм.

За, эцсийн мэдрэгчтэй зүйл бол collect_securitites-д мета мэдээлэл цуглуулах агент руу залгах юм:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Бид мессежийн хувьд өмнө нь зарласан схемийг ашигладаг. Энэ тохиолдолд агентаас үр дүнг хүлээх шаардлагагүй тул би .cast аргыг ашигласан, гэхдээ үүнийг дурдах нь зүйтэй болов уу. арга замууд сэдэв рүү мессеж илгээх:

  1. цутгах - үр дүнг хүлээхгүй тул блоклодоггүй. Та үр дүнг өөр сэдэв рүү мессеж хэлбэрээр илгээх боломжгүй.

  2. илгээх - үр дүн хүлээхгүй тул блоклодоггүй. Та үр дүн гарах сэдвийн төлөөлөгчийг зааж өгч болно.

  3. асуух - үр дүнг хүлээж байна. Та үр дүн гарах сэдвийн төлөөлөгчийг зааж өгч болно.

Өнөөдөр энэ бүгд агентуудтай холбоотой!

Мөрөөдлийн баг

Энэ хэсэгт бичнэ гэж амласан сүүлчийн зүйл бол тушаалууд юм. Өмнө дурьдсанчлан, faust дахь командууд нь товшилтыг тойрон эргэлддэг. Үнэн хэрэгтээ faust нь -A товчлуурыг зааж өгөхдөө бидний захиалгат командыг интерфейсдээ хавсаргадаг

Зарлагдсан агентуудын дараа agents.py Чимэглэгчтэй функц нэмнэ app.commandаргыг дуудаж байна шидэлт у үнэт цаас цуглуулах:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Тиймээс, хэрэв бид тушаалуудын жагсаалтыг дуудвал бидний шинэ тушаал үүнд байх болно:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Бид үүнийг бусадтай адил ашиглаж болох тул faust worker-ийг дахин эхлүүлж, үнэт цаасны бүрэн цуглуулгыг эхлүүлцгээе:

> faust -A horton.agents start-collect-securities

Дараа нь юу болох вэ?

Дараагийн хэсэгт үлдэгдэл агентуудыг жишээ болгон ашиглан тухайн жилийн арилжааны хаалтын үнэ болон агентуудыг ажиллуулахад хэт туйлшралыг хайж олох механизмыг авч үзэх болно.

Энэ бол өнөөдрийнх! Уншсанд баярлалаа :)

Энэ хэсгийн код

Фауст, II хэсэг: Агентууд ба багуудын талаархи үндсэн даалгавар

Жич Сүүлийн хэсгийн доор надаас фауст ба нийлсэн кафкагийн тухай асуусан (confluent ямар онцлогтой вэ?). Конфлуент нь олон талаараа илүү ажиллагаатай юм шиг санагддаг, гэхдээ үнэн хэрэгтээ faust нь нэгдлийн талаар үйлчлүүлэгчийн бүрэн дэмжлэггүй байдаг. Док дахь үйлчлүүлэгчийн хязгаарлалтын тайлбар.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх