Zadania podstawowe dotyczące Fausta, część II: Agenci i zespoły

Zadania podstawowe dotyczące Fausta, część II: Agenci i zespoły

Spis treści

  1. Część I: Wprowadzenie

  2. Część II: Agenci i zespoły

Co my tu robimy?

A więc druga część. Jak napisano wcześniej, wykonamy w nim następujące czynności:

  1. Napiszmy małego klienta dla alphavantage na aiohttp z żądaniami dotyczącymi potrzebnych nam punktów końcowych.

  2. Stwórzmy agenta, który będzie zbierał dane o papierach wartościowych i metainformacje na ich temat.

Ale to właśnie zrobimy dla samego projektu, a jeśli chodzi o badania fausta, nauczymy się pisać agenty przetwarzające zdarzenia strumieniowe z kafki, a także jak pisać polecenia (opakowanie kliknięć), w naszym przypadku - dla ręcznych wiadomości push na temat monitorowany przez agenta.

Szkolenie

Klient AlphaVantage

Najpierw napiszmy małego klienta aiohttp dla żądań do alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Właściwie wszystko jest z tego jasne:

  1. Interfejs API AlphaVantage jest dość prosty i pięknie zaprojektowany, dlatego zdecydowałem się wysyłać wszystkie żądania za pomocą tej metody construct_query gdzie z kolei następuje wywołanie http.

  2. Sprowadzam wszystkie pola snake_case dla komfortu.

  3. Cóż, dekoracja logger.catch zapewniająca piękne i pouczające wyniki śledzenia.

PS Nie zapomnij dodać lokalnie tokenu alphavantage do pliku config.yml lub wyeksportuj zmienną środowiskową HORTON_SERVICE_APIKEY. Otrzymujemy token tutaj.

klasa CRUD

Będziemy mieć zbiór papierów wartościowych do przechowywania metainformacji o papierach wartościowych.

baza danych/bezpieczeństwo.py

Moim zdaniem nie trzeba tu nic wyjaśniać, a sama klasa bazowa jest dość prosta.

Ściągnij aplikację()

Dodajmy funkcję tworzenia obiektu aplikacji w aplikacja.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Na razie będziemy mieli najprostszą metodę tworzenia aplikacji, nieco później ją jednak rozwiniemy, aby nie każ Wam czekać na czekanie, tutaj Bibliografia do klasy aplikacji. Radzę również przyjrzeć się klasie ustawień, ponieważ jest ona odpowiedzialna za większość ustawień.

Główna część

Agent zajmujący się gromadzeniem i prowadzeniem wykazu papierów wartościowych

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Zatem najpierw otrzymujemy obiekt aplikacji Faust - jest to całkiem proste. Następnie wyraźnie deklarujemy temat dla naszego agenta... Warto w tym miejscu wspomnieć, czym jest, jaki jest parametr wewnętrzny i jak można to inaczej ułożyć.

  1. Tematy w kafce, jeśli chcemy poznać dokładną definicję, lepiej poczytać wyłączony. dokumentlub możesz przeczytać kompendium na Habré po rosyjsku, gdzie też wszystko jest dość trafnie odzwierciedlone :)

  2. Parametr wewnętrzny, dość dobrze opisany w dokumencie Fausta, pozwala nam skonfigurować temat bezpośrednio w kodzie, oczywiście oznacza to parametry dostarczone przez twórców Fausta, na przykład: retencja, polityka przechowywania (domyślnie usuń, ale możesz ustawić kompaktowy), liczba partycji na temat (wynikizrobić na przykład mniej niż wartość globalna aplikacje faustowe).

  3. Ogólnie rzecz biorąc, agent może utworzyć zarządzany temat z wartościami globalnymi, jednak lubię deklarować wszystko jawnie. Ponadto nie można skonfigurować niektórych parametrów (na przykład liczby partycji lub zasad przechowywania) tematu w ogłoszeniu agenta.

    Oto jak mogłoby to wyglądać bez ręcznego definiowania tematu:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No cóż, teraz opiszemy, czym będzie się zajmował nasz agent :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Zatem na początku agenta otwieramy sesję aiohttp dla żądań za pośrednictwem naszego klienta. Zatem przy uruchamianiu workera, gdy zostanie uruchomiony nasz agent, od razu zostanie otwarta sesja - jedna na cały czas pracy workera (lub kilka, jeśli zmienisz parametr współbieżność od agenta z domyślną jednostką).

Następnie podążamy za strumieniem (umieszczamy wiadomość w _, ponieważ my w tym agencie nie dbamy o treść) wiadomości z naszego tematu, jeśli istnieją w bieżącym przesunięciu, w przeciwnym razie nasz cykl będzie czekał na ich przybycie. Cóż, w naszej pętli logujemy otrzymanie wiadomości, uzyskujemy listę aktywnych (get_securities domyślnie zwraca tylko aktywne, zobacz kod klienta) papierów wartościowych i zapisujemy ją do bazy danych, sprawdzając, czy istnieje papier wartościowy z tym samym tickerem i wymiana w bazie danych, jeśli takowa istnieje, to ona (papier) zostanie po prostu zaktualizowana.

Uruchommy nasze dzieło!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Funkcje PS komponent sieciowy Nie będę rozważał fausta w artykułach, dlatego ustawiamy odpowiednią flagę.

W naszym poleceniu uruchomienia powiedzieliśmy Faustowi, gdzie szukać obiektu aplikacji i co z nim zrobić (uruchomić proces roboczy) za pomocą poziomu wyjściowego dziennika informacyjnego. Otrzymujemy następujące dane wyjściowe:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

To żyje!!!

Spójrzmy na zestaw partycji. Jak widzimy, założono temat o nazwie, którą wyznaczyliśmy w kodzie, domyślnej liczbie partycji (8, wziętych z temat_partycje - parametr obiektu aplikacji), ponieważ nie określiliśmy indywidualnej wartości dla naszego tematu (poprzez partycje). Do uruchomionego agenta w workerze przypisane są wszystkie 8 partycji, ponieważ jest to jedyna partycja, ale zostanie to omówione bardziej szczegółowo w części dotyczącej klastrowania.

Cóż, teraz możemy przejść do innego okna terminala i wysłać pustą wiadomość na nasz temat:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

Używam PS @ pokazujemy, że wysyłamy wiadomość do tematu o nazwie „collect_securities”.

W tym przypadku wiadomość trafiła do partycji 6 - możesz to sprawdzić wchodząc na kafdrop on localhost:9000

Podchodząc z naszym pracownikiem do okna terminala, zobaczymy wesołą wiadomość wysłaną za pomocą loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Możemy również zajrzeć do mongo (używając Robo3T lub Studio3T) i sprawdzić, czy papiery wartościowe znajdują się w bazie danych:

Nie jestem miliarderem i dlatego zadowalamy się pierwszą opcją oglądania.

Zadania podstawowe dotyczące Fausta, część II: Agenci i zespołyZadania podstawowe dotyczące Fausta, część II: Agenci i zespoły

Szczęście i radość - pierwszy agent gotowy :)

Agent gotowy, niech żyje nowy agent!

Tak, panowie, przeszliśmy dopiero 1/3 ścieżki przygotowanej w tym artykule, ale nie zniechęcajcie się, bo teraz będzie łatwiej.

Zatem teraz potrzebujemy agenta, który zbiera metainformacje i umieszcza je w dokumencie zbiorczym:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Ponieważ agent ten będzie przetwarzał informacje o konkretnym zabezpieczeniu, należy w wiadomości wskazać znacznik (symbol) tego zabezpieczenia. W tym celu w fauście są Dokumentacja — klasy deklarujące schemat komunikatów w temacie agenta.

W tym przypadku przejdźmy do zapisy.py i opisz, jak powinna wyglądać wiadomość dla tego tematu:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Jak można się domyślić, Faust używa adnotacji typu Pythona do opisu schematu komunikatu, dlatego minimalna wersja obsługiwana przez bibliotekę to 3.6.

Wróćmy do agenta, ustawmy typy i dodajmy:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Jak widać do metody inicjalizacji tematu przekazujemy nowy parametr ze schematem - typ_wartości. Co więcej, wszystko przebiega według tego samego schematu, więc nie widzę sensu rozwodzić się nad czymkolwiek innym.

Cóż, ostatnim akcentem jest dodanie wywołania agenta zbierającego metainformacje do Collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Używamy wcześniej ogłoszonego schematu przekazu. W tym przypadku użyłem metody .cast, ponieważ nie musimy czekać na wynik od agenta, ale warto o tym wspomnieć sposoby wyślij wiadomość w temacie:

  1. cast - nie blokuje, bo nie oczekuje wyniku. Nie możesz wysłać wyniku do innego tematu jako wiadomości.

  2. send - nie blokuje, ponieważ nie oczekuje wyniku. W temacie możesz określić agenta, do którego trafi wynik.

  3. zapytaj - czeka na wynik. W temacie możesz określić agenta, do którego trafi wynik.

To tyle z agentów na dziś!

Drużyna marzeń

Ostatnią rzeczą, którą obiecałem napisać w tej części, są polecenia. Jak wspomniano wcześniej, polecenia w faust otaczają kliknięcie. W rzeczywistości Faust po prostu dołącza nasze niestandardowe polecenie do swojego interfejsu, podając klawisz -A

Po ogłoszonych agentach w agenci.py dodaj funkcję z dekoratorem polecenie.aplikacjiwywołanie metody rzucać у zbieranie_sekurytyzacji:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Zatem jeśli wywołamy listę poleceń, będzie w niej znajdować się nasze nowe polecenie:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Możemy z niego korzystać jak każdy inny, zatem zrestartujmy fausta workera i rozpocznijmy pełnoprawną zbiórkę papierów wartościowych:

> faust -A horton.agents start-collect-securities

Co się później stanie?

W dalszej części, na przykładzie pozostałych agentów, rozważymy mechanizm ujścia służący do wyszukiwania ekstremów w cenach zamknięcia handlu na dany rok i cronowego uruchomienia agentów.

To wszystko na dzisiaj! Dziękuje za przeczytanie :)

Kod dla tej części

Zadania podstawowe dotyczące Fausta, część II: Agenci i zespoły

PS W ostatniej części zapytano mnie o faustną i zlewającą się kafkę (jakie funkcje ma konfluent?). Wydaje się, że confluent jest pod wieloma względami bardziej funkcjonalny, jednak faktem jest, że Faust nie posiada pełnego wsparcia klienckiego dla confluent - wynika to z opisy ograniczeń klienta w doc.

Źródło: www.habr.com

Dodaj komentarz