A więc druga część. Jak napisano wcześniej, wykonamy w nim następujące czynności:
Napiszmy małego klienta dla alphavantage na aiohttp z żądaniami dotyczącymi potrzebnych nam punktów końcowych.
Stwórzmy agenta, który będzie zbierał dane o papierach wartościowych i metainformacje na ich temat.
Ale to właśnie zrobimy dla samego projektu, a jeśli chodzi o badania fausta, nauczymy się pisać agenty przetwarzające zdarzenia strumieniowe z kafki, a także jak pisać polecenia (opakowanie kliknięć), w naszym przypadku - dla ręcznych wiadomości push na temat monitorowany przez agenta.
Szkolenie
Klient AlphaVantage
Najpierw napiszmy małego klienta aiohttp dla żądań do alphavantage.
Interfejs API AlphaVantage jest dość prosty i pięknie zaprojektowany, dlatego zdecydowałem się wysyłać wszystkie żądania za pomocą tej metody construct_query gdzie z kolei następuje wywołanie http.
Sprowadzam wszystkie pola snake_case dla komfortu.
Cóż, dekoracja logger.catch zapewniająca piękne i pouczające wyniki śledzenia.
PS Nie zapomnij dodać lokalnie tokenu alphavantage do pliku config.yml lub wyeksportuj zmienną środowiskową HORTON_SERVICE_APIKEY. Otrzymujemy token tutaj.
klasa CRUD
Będziemy mieć zbiór papierów wartościowych do przechowywania metainformacji o papierach wartościowych.
Na razie będziemy mieli najprostszą metodę tworzenia aplikacji, nieco później ją jednak rozwiniemy, aby nie każ Wam czekać na czekanie, tutaj Bibliografia do klasy aplikacji. Radzę również przyjrzeć się klasie ustawień, ponieważ jest ona odpowiedzialna za większość ustawień.
Główna część
Agent zajmujący się gromadzeniem i prowadzeniem wykazu papierów wartościowych
Zatem najpierw otrzymujemy obiekt aplikacji Faust - jest to całkiem proste. Następnie wyraźnie deklarujemy temat dla naszego agenta... Warto w tym miejscu wspomnieć, czym jest, jaki jest parametr wewnętrzny i jak można to inaczej ułożyć.
Tematy w kafce, jeśli chcemy poznać dokładną definicję, lepiej poczytać wyłączony. dokumentlub możesz przeczytać kompendium na Habré po rosyjsku, gdzie też wszystko jest dość trafnie odzwierciedlone :)
Parametr wewnętrzny, dość dobrze opisany w dokumencie Fausta, pozwala nam skonfigurować temat bezpośrednio w kodzie, oczywiście oznacza to parametry dostarczone przez twórców Fausta, na przykład: retencja, polityka przechowywania (domyślnie usuń, ale możesz ustawić kompaktowy), liczba partycji na temat (wynikizrobić na przykład mniej niż wartość globalna aplikacje faustowe).
Ogólnie rzecz biorąc, agent może utworzyć zarządzany temat z wartościami globalnymi, jednak lubię deklarować wszystko jawnie. Ponadto nie można skonfigurować niektórych parametrów (na przykład liczby partycji lub zasad przechowywania) tematu w ogłoszeniu agenta.
Oto jak mogłoby to wyglądać bez ręcznego definiowania tematu:
No cóż, teraz opiszemy, czym będzie się zajmował nasz agent :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Zatem na początku agenta otwieramy sesję aiohttp dla żądań za pośrednictwem naszego klienta. Zatem przy uruchamianiu workera, gdy zostanie uruchomiony nasz agent, od razu zostanie otwarta sesja - jedna na cały czas pracy workera (lub kilka, jeśli zmienisz parametr współbieżność od agenta z domyślną jednostką).
Następnie podążamy za strumieniem (umieszczamy wiadomość w _, ponieważ my w tym agencie nie dbamy o treść) wiadomości z naszego tematu, jeśli istnieją w bieżącym przesunięciu, w przeciwnym razie nasz cykl będzie czekał na ich przybycie. Cóż, w naszej pętli logujemy otrzymanie wiadomości, uzyskujemy listę aktywnych (get_securities domyślnie zwraca tylko aktywne, zobacz kod klienta) papierów wartościowych i zapisujemy ją do bazy danych, sprawdzając, czy istnieje papier wartościowy z tym samym tickerem i wymiana w bazie danych, jeśli takowa istnieje, to ona (papier) zostanie po prostu zaktualizowana.
Uruchommy nasze dzieło!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Funkcje PS komponent sieciowy Nie będę rozważał fausta w artykułach, dlatego ustawiamy odpowiednią flagę.
W naszym poleceniu uruchomienia powiedzieliśmy Faustowi, gdzie szukać obiektu aplikacji i co z nim zrobić (uruchomić proces roboczy) za pomocą poziomu wyjściowego dziennika informacyjnego. Otrzymujemy następujące dane wyjściowe:
Spójrzmy na zestaw partycji. Jak widzimy, założono temat o nazwie, którą wyznaczyliśmy w kodzie, domyślnej liczbie partycji (8, wziętych z temat_partycje - parametr obiektu aplikacji), ponieważ nie określiliśmy indywidualnej wartości dla naszego tematu (poprzez partycje). Do uruchomionego agenta w workerze przypisane są wszystkie 8 partycji, ponieważ jest to jedyna partycja, ale zostanie to omówione bardziej szczegółowo w części dotyczącej klastrowania.
Cóż, teraz możemy przejść do innego okna terminala i wysłać pustą wiadomość na nasz temat:
Używam PS @ pokazujemy, że wysyłamy wiadomość do tematu o nazwie „collect_securities”.
W tym przypadku wiadomość trafiła do partycji 6 - możesz to sprawdzić wchodząc na kafdrop on localhost:9000
Podchodząc z naszym pracownikiem do okna terminala, zobaczymy wesołą wiadomość wysłaną za pomocą loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Możemy również zajrzeć do mongo (używając Robo3T lub Studio3T) i sprawdzić, czy papiery wartościowe znajdują się w bazie danych:
Nie jestem miliarderem i dlatego zadowalamy się pierwszą opcją oglądania.
Szczęście i radość - pierwszy agent gotowy :)
Agent gotowy, niech żyje nowy agent!
Tak, panowie, przeszliśmy dopiero 1/3 ścieżki przygotowanej w tym artykule, ale nie zniechęcajcie się, bo teraz będzie łatwiej.
Zatem teraz potrzebujemy agenta, który zbiera metainformacje i umieszcza je w dokumencie zbiorczym:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Ponieważ agent ten będzie przetwarzał informacje o konkretnym zabezpieczeniu, należy w wiadomości wskazać znacznik (symbol) tego zabezpieczenia. W tym celu w fauście są Dokumentacja — klasy deklarujące schemat komunikatów w temacie agenta.
W tym przypadku przejdźmy do zapisy.pyi opisz, jak powinna wyglądać wiadomość dla tego tematu:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Jak można się domyślić, Faust używa adnotacji typu Pythona do opisu schematu komunikatu, dlatego minimalna wersja obsługiwana przez bibliotekę to 3.6.
Jak widać do metody inicjalizacji tematu przekazujemy nowy parametr ze schematem - typ_wartości. Co więcej, wszystko przebiega według tego samego schematu, więc nie widzę sensu rozwodzić się nad czymkolwiek innym.
Cóż, ostatnim akcentem jest dodanie wywołania agenta zbierającego metainformacje do Collect_securitites:
Używamy wcześniej ogłoszonego schematu przekazu. W tym przypadku użyłem metody .cast, ponieważ nie musimy czekać na wynik od agenta, ale warto o tym wspomnieć sposoby wyślij wiadomość w temacie:
cast - nie blokuje, bo nie oczekuje wyniku. Nie możesz wysłać wyniku do innego tematu jako wiadomości.
send - nie blokuje, ponieważ nie oczekuje wyniku. W temacie możesz określić agenta, do którego trafi wynik.
zapytaj - czeka na wynik. W temacie możesz określić agenta, do którego trafi wynik.
To tyle z agentów na dziś!
Drużyna marzeń
Ostatnią rzeczą, którą obiecałem napisać w tej części, są polecenia. Jak wspomniano wcześniej, polecenia w faust otaczają kliknięcie. W rzeczywistości Faust po prostu dołącza nasze niestandardowe polecenie do swojego interfejsu, podając klawisz -A
Po ogłoszonych agentach w agenci.py dodaj funkcję z dekoratorem polecenie.aplikacjiwywołanie metody rzucać у zbieranie_sekurytyzacji:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Zatem jeśli wywołamy listę poleceń, będzie w niej znajdować się nasze nowe polecenie:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Możemy z niego korzystać jak każdy inny, zatem zrestartujmy fausta workera i rozpocznijmy pełnoprawną zbiórkę papierów wartościowych:
> faust -A horton.agents start-collect-securities
Co się później stanie?
W dalszej części, na przykładzie pozostałych agentów, rozważymy mechanizm ujścia służący do wyszukiwania ekstremów w cenach zamknięcia handlu na dany rok i cronowego uruchomienia agentów.
To wszystko na dzisiaj! Dziękuje za przeczytanie :)
PS W ostatniej części zapytano mnie o faustną i zlewającą się kafkę (jakie funkcje ma konfluent?). Wydaje się, że confluent jest pod wieloma względami bardziej funkcjonalny, jednak faktem jest, że Faust nie posiada pełnego wsparcia klienckiego dla confluent - wynika to z opisy ograniczeń klienta w doc.