Фауст бойынша негізгі тапсырмалар, II бөлім: Агенттер мен командалар

Фауст бойынша негізгі тапсырмалар, II бөлім: Агенттер мен командалар

Мазмұны

  1. I бөлім: Кіріспе

  2. II бөлім: Агенттер мен командалар

Біз мұнда не істеп жатырмыз?

Сонымен, екінші бөлім. Бұрын жазылғандай, онда біз келесі әрекеттерді орындаймыз:

  1. Бізге қажетті соңғы нүктелерге сұраныстармен aiohttp-те alphavantage үшін шағын клиентті жазайық.

  2. Бағалы қағаздар туралы деректерді және олар бойынша мета ақпараттарды жинайтын агент құрайық.

Бірақ, бұл біз жобаның өзі үшін жасайтын нәрсе және фауст зерттеуі тұрғысынан біз кафкадан ағындық оқиғаларды өңдейтін агенттерді қалай жазу керектігін, сондай-ақ командаларды қалай жазу керектігін (басу ораушысы), біздің жағдайда - агент бақылайтын тақырыпқа қолмен push хабарлары үшін.

Дайындау

AlphaVantage клиенті

Алдымен, альфавантқа сұраныстар үшін шағын aiohttp клиентін жазайық.

alphavantage.py

спойлер

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Шындығында, одан бәрі анық:

  1. AlphaVantage API өте қарапайым және әдемі жасалған, сондықтан мен барлық сұрауларды әдіс арқылы жасауды шештім construct_query мұнда өз кезегінде http қоңырауы бар.

  2. Мен барлық өрістерді әкелемін snake_case ыңғайлылық үшін.

  3. Әдемі және ақпараттандыратын кері бақылау нәтижесі үшін logger.catch безендіру.

PS Әлфавант таңбалауышын жергілікті түрде config.yml файлына қосуды немесе орта айнымалы мәнін экспорттауды ұмытпаңыз. HORTON_SERVICE_APIKEY. Біз белгіні аламыз осында.

CRUD класы

Бізде бағалы қағаздар туралы мета ақпаратты сақтау үшін бағалы қағаздар жинағы болады.

database/security.py

Менің ойымша, мұнда ештеңені түсіндірудің қажеті жоқ, ал базалық класстың өзі өте қарапайым.

get_app()

қолданба объектісін құру функциясын қосамыз app.py

спойлер

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Әзірге бізде ең қарапайым қосымшаны жасау болады, сәл кейінірек біз оны кеңейтеміз, бірақ сізді күтпеу үшін осы жерде сілтемелер Қолданба сыныбына. Сондай-ақ мен сізге параметрлер класын қарауға кеңес беремін, өйткені ол көптеген параметрлерге жауап береді.

Негізгі бөлім

Бағалы қағаздар тізімін жинау және жүргізу жөніндегі агент

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Сонымен, алдымен біз faust қолданбасының нысанын аламыз - бұл өте қарапайым. Әрі қарай, біз агентімізге тақырыпты жариялаймыз... Бұл жерде оның не екенін, ішкі параметрі қандай және оны қалай басқаша реттеуге болатынын атап өткен жөн.

  1. Кафкадағы тақырыптар, егер нақты анықтамасын білгіміз келсе, оқыған дұрыс өшірулі. құжатнемесе оқуға болады дерексіз Хабреде орыс тілінде, мұнда бәрі дәл көрсетілген :)

  2. Ішкі параметр, faust doc-те өте жақсы сипатталған, тақырыпты тікелей кодта конфигурациялауға мүмкіндік береді, әрине, бұл faust әзірлеушілері ұсынған параметрлерді білдіреді, мысалы: сақтау, сақтау саясаты (әдепкі бойынша жою, бірақ орнатуға болады. тығыз), бір тақырыпқа арналған бөлімдер саны (баллістеу, мысалы, аз жаһандық маңызы faust қолданбалары).

  3. Жалпы, агент жаһандық мәндері бар басқарылатын тақырыпты жасай алады, бірақ мен бәрін анық жариялауды ұнатамын. Сонымен қатар, агент жарнамасындағы тақырыптың кейбір параметрлерін (мысалы, бөлімдер саны немесе сақтау саясаты) конфигурациялау мүмкін емес.

    Тақырыпты қолмен анықтамай-ақ, бұл келесідей болуы мүмкін:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Енді агентіміз не істейтінін сипаттайық :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Сонымен, агенттің басында біз клиент арқылы сұраулар үшін aiohttp сеансын ашамыз. Осылайша, жұмысшыны іске қосқан кезде, біздің агент іске қосылғанда, сеанс бірден ашылады - бір, жұмысшы жұмыс істеп тұрған бүкіл уақыт бойы (немесе параметрді өзгертсеңіз, бірнеше). параллельдік әдепкі бірлігі бар агенттен).

Содан кейін біз ағынмен жүреміз (хабарламаны орналастырамыз _, өйткені біз, бұл агентте, тақырыбымыздағы хабарлардың мазмұнына) мән бермейміз, егер олар ағымдағы аралықта болса, әйтпесе біздің цикл олардың келуін күтеді. Ал, біздің цикл ішінде біз хабарламаны алуды тіркейміз, белсенді (get_securities әдепкі бойынша тек белсенді қайтарады, клиент кодын қараңыз) бағалы қағаздардың тізімін аламыз және оны дерекқорға сақтаймыз, сол коды бар құнды қағаз бар-жоғын тексереміз және дерекқорда алмасу , егер бар болса, онда ол (қағаз) жай ғана жаңартылады.

Шығармашылығымызды бастаймыз!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS мүмкіндіктері веб-компонент Мен мақалаларда фаустты қарастырмаймын, сондықтан біз тиісті жалаушаны орнаттық.

Іске қосу пәрменінде біз faust қолданбасына қолданба объектісін қайдан іздеу керектігін және онымен не істеу керектігін (жұмысшыны іске қосу) ақпарат журналының шығыс деңгейімен айттық. Біз келесі нәтижені аламыз:

спойлер

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ол тірі!!!

Бөлім жиынтығын қарастырайық. Көріп отырғанымыздай, тақырып біз кодта белгілеген атаумен, бөлімдердің әдепкі санымен (8, алынған тақырып_бөлімдері - қолданба нысанының параметрі), өйткені біз тақырыбымыз үшін жеке мәнді көрсетпедік (бөлімдер арқылы). Жұмысшыда іске қосылған агентке барлық 8 бөлім тағайындалған, өйткені ол жалғыз, бірақ бұл кластерлеу туралы бөлімде толығырақ қарастырылады.

Енді біз басқа терминал терезесіне өтіп, тақырыбымызға бос хабарлама жібере аламыз:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS пайдаланады @ біз «бағалы қағаздарды жинау» деп аталатын тақырыпқа хабарлама жіберіп жатқанымызды көрсетеміз.

Бұл жағдайда хабарлама 6-бөлімге жіберілді - мұны kafdrop қосу арқылы тексеруге болады localhost:9000

Терминал терезесіне қызметкерімізбен бірге өтіп, loguru арқылы жіберілген қуанышты хабарды көреміз:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Біз сондай-ақ mongo (Robo3T немесе Studio3T көмегімен) қарап, бағалы қағаздардың дерекқорда екенін көре аламыз:

Мен миллиардер емеспін, сондықтан біз бірінші көру опциясына қанағаттанамыз.

Фауст бойынша негізгі тапсырмалар, II бөлім: Агенттер мен командаларФауст бойынша негізгі тапсырмалар, II бөлім: Агенттер мен командалар

Бақыт пен қуаныш - бірінші агент дайын :)

Агент дайын, жаңа агент аман болсын!

Иә, мырзалар, біз осы мақалада дайындалған жолдың 1/3 бөлігін ғана өттік, бірақ мұңаймаңыздар, өйткені енді бұл оңайырақ болады.

Енді бізге мета ақпаратты жинайтын және оны жинақ құжатына енгізетін агент қажет:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Бұл агент белгілі бір бағалы қағаз туралы ақпаратты өңдейтіндіктен, хабарда осы бағалы қағаздың белгісін (таңбасын) көрсету керек. Осы мақсатта фауста бар Records — агент тақырыбында хабарлама схемасын жариялайтын сыныптар.

Бұл жағдайда келесіге барайық records.py және осы тақырыпқа арналған хабардың қандай болуы керектігін сипаттаңыз:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Сіз болжағандай, faust хабарлама схемасын сипаттау үшін питон түріндегі аннотацияны пайдаланады, сондықтан кітапхана қолдайтын ең аз нұсқа 3.6.

Агентке оралайық, түрлерін орнатып, оны қосыңыз:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Көріп отырғаныңыздай, біз тақырыпты инициализациялау әдісіне схемасы бар жаңа параметрді береміз - мән_түрі. Сонымен қатар, бәрі бірдей схема бойынша жүреді, сондықтан мен басқа ештеңеге тоқталудың мағынасын көрмеймін.

Ал, соңғы жанасу - collect_securitites үшін мета ақпарат жинау агентіне қоңырау қосу:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Хабарлама үшін бұрын жарияланған схеманы қолданамыз. Бұл жағдайда мен .cast әдісін қолдандым, себебі агенттен нәтиже күтудің қажеті жоқ, бірақ бұл туралы айта кеткен жөн. тәсілдері тақырыпқа хабарлама жіберіңіз:

  1. cast - блоктамайды, себебі ол нәтиже күтпейді. Нәтижені басқа тақырыпқа хабарлама ретінде жібере алмайсыз.

  2. жіберу - блоктамайды, себебі ол нәтиже күтпейді. Нәтиже баратын тақырыпта агентті көрсетуге болады.

  3. сұрау - нәтиже күтеді. Нәтиже баратын тақырыпта агентті көрсетуге болады.

Сонымен, бүгінгі агенттермен бәрі осы!

Арман командасы

Бұл бөлімде жазуға уәде берген соңғы нәрсе - командалар. Жоғарыда айтылғандай, faust пәрмендері басу айналасындағы орауыш болып табылады. Шын мәнінде, faust -A пернесін көрсеткенде интерфейсіне біздің пайдаланушы пәрменін тіркейді

Жарияланған агенттерден кейін agents.py декораторы бар функцияны қосыңыз app.commandәдісті шақыру құйылған у жинау_қауіпсіздігі:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Осылайша, егер біз командалар тізімін шақырсақ, онда біздің жаңа пәрменіміз болады:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Біз оны басқалар сияқты пайдалана аламыз, сондықтан біз фауст жұмысшысын қайта іске қосып, толыққанды бағалы қағаздар жинағын бастаймыз:

> faust -A horton.agents start-collect-securities

Келесіде не болады?

Келесі бөлімде, мысал ретінде қалған агенттерді пайдалана отырып, біз жылдағы сауданың жабылу бағаларындағы экстремалды іздеу механизмін және агенттерді іске қосуды қарастырамыз.

Бүгінге бәрі осы! Оқығаныңызға рахмет :)

Осы бөлікке арналған код

Фауст бойынша негізгі тапсырмалар, II бөлім: Агенттер мен командалар

PS Соңғы бөлімде маған фауст пен конфлюенттік кафка туралы сұрақ қойылды (конфлуенттің қандай қасиеттері бар?). Confluent көптеген жолдармен функционалдырақ болып көрінеді, бірақ факт мынада, фауста конфлуент үшін толық клиенттік қолдау жоқ - бұл келесіден туындайды. құжаттағы клиент шектеулерінің сипаттамасы.

Ақпарат көзі: www.habr.com

пікір қалдыру