Зміст
Частина II: Агенти та Команди
Що ми тут робимо?
Отже, друга частина. Як і писалося раніше, у ній ми зробимо таке:
Напишемо невеликий клієнтик для alphavantage на aiohttp із запитами на потрібні нам ендпоінти.
Зробимо агента, який збиратиме дані про цінні папери та мета інформацію щодо них.
Але це те, що ми зробимо для самого проекту, а в плані дослідження faust ми дізнаємося, як писати агентів, які обробляють стрим подій з kafka, а так само як написати команди (обгортка на click), у нашому випадку — для ручного пуша повідомлення у топік, за яким стежить агент.
Підготовка
Клієнт AlphaVantage
Для початку напишемо невеликий aiohttp клієнтик для запитів на alphavantage.
Спойлер
import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union
import aiohttp
import pandas as pd
import stringcase
from loguru import logger
from horton.config import API_ENDPOINT
class AlphaVantageClient:
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
api_endpoint: str = API_ENDPOINT,
):
self._query_params = {"datatype": "json", "apikey": api_key}
self._api_endpoint = api_endpoint
self._session = session
@logger.catch
def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
formatted_data = {}
for field, item in data.items():
formatted_data[stringcase.snakecase(field)] = item
return formatted_data
@logger.catch
async def _construct_query(
self, function: str, to_json: bool = True, **kwargs
) -> Union[Dict[str, Any], str]:
path = "query/"
async with self._session.get(
urlparse.urljoin(self._api_endpoint, path),
params={"function": function, **kwargs, **self._query_params},
) as response:
data = (await response.json()) if to_json else (await response.text())
if to_json:
data = self._format_fields(data)
return data
@logger.catch
async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)
data = pd.read_csv(StringIO(data))
securities = data.to_dict("records")
for index, security in enumerate(securities):
security = self._format_fields(security)
security["_type"] = "physical"
securities[index] = security
return securities
@logger.catch
async def get_security_overview(self, symbol: str) -> Dict[str, str]:
return await self._construct_query("OVERVIEW", symbol=symbol)
@logger.catch
async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query(
"TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
)
@logger.catch
async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)
@logger.catch
async def get_indicator_data(
self, symbol: str, indicator: str, **indicator_options
) -> Dict[str, Any]:
return await self._construct_query(
indicator, symbol=symbol, **indicator_options
)
Власне по ньому все ясно:
API AlphaVantage досить просто та красиво спроектована, тому всі запити я вирішив проводити через метод
construct_queryде своє чергу йде http виклик.Усі поля я приводжу до
snake_caseдля зручності.Ну і декорація logger.catch для гарного та інформативного виведення трейсбеку.
PS Незабутнє локально додати токен alphavantage в config.yml, або експортувати змінне середовище HORTON_SERVICE_APIKEY. Отримуємо токен .
CRUD-клас
У нас буде колекція securities для зберігання мета інформації про цінні папери.
Тут нічого пояснювати не потрібно, а базовий клас сам по собі досить простий.
get_app()
Додамо функцію створення об'єкта програми в
Спойлер
import faust
from horton.config import KAFKA_BROKERS
def get_app():
return faust.App("horton", broker=KAFKA_BROKERS)
Поки в нас буде найпростіше створення програми, трохи пізніше ми його розширимо, однак, щоб не змушувати вас чекати, ось на App-клас. На клас settings теж раджу подивитись, тому що саме він відповідає за більшу частину налаштувань.
Основна частина
Агент збору та збереження списку цінних паперів
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passТак, спочатку отримуємо об'єкт faust-додатку – це досить просто. Далі, ми явно оголошуємо топік для нашого агента ... Тут варто згадати, що це таке, що за параметр internal і як це можна влаштувати по-іншому.
Топики в kafka, якщо ми хочемо дізнатися точне визначення, краще прочитати , або можна прочитати на хабре російською, де так само все досить точно відображено 🙂
, Досить добре описаний в доці faust, дозволяє нам налаштовувати топік прямо в коді, природно, маються на увазі параметри, передбачені розробниками faust, наприклад: retention, retention policy (за замовчуванням delete, але можна встановити і ), кількість партицій на топік (, щоб зробити, наприклад, менше ніж програми faust).
Взагалі, агент може створювати сам керований топік з глобальними значеннями, проте, я люблю оголошувати все явно. До того ж, деякі параметри (наприклад, кількість партицій або retention policy) топіка в оголошенні агента налаштувати не можна.
Ось як це могло виглядати без ручного визначення топіка:
app = get_app()
@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
passНу а тепер опишемо, що робитиме наш агент 🙂
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Отже, спочатку агента ми відкриваємо aiohttp сесію для запитів через наш клієнт. Таким чином, при запуску воркера, коли буде запущено наш агент, відразу ж буде відкрито сесію — одну, на весь час роботи воркера (або кілька, якщо змінити параметр у агента з дефолтної одиниці).
Далі, ми йдемо по стриму (повідомлення ми поміщаємо в _, тому що нам, в даному агенті, байдуже зміст) повідомлень з нашого топіка, якщо вони є при поточному зрушенні (offset), інакше, наш цикл чекатиме їх надходження. Ну а всередині нашого циклу, ми логуємо надходження повідомлення, отримуємо список активних цінних паперів і зберігаємо його в базу, перевіряючи при цьому, чи є папір з таким тикером і біржею в БД. якщо є, то вона (папір) просто оновиться.
Запустимо наше творіння!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l infoPS Можливості faust я розглядати у статтях не буду, тож виставляємо відповідний прапор.
У нашій команді запуску ми вказали faust'у, де шукати об'єкт програми та що робити з ним (запустити воркер) з рівнем виведення логів info. Отримуємо наступний висновок:
Спойлер
┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id │ horton │
│ transport │ [URL('kafka://localhost:9092')] │
│ store │ memory: │
│ log │ -stderr- (info) │
│ pid │ 1271262 │
│ hostname │ host-name │
│ platform │ CPython 3.8.2 (Linux x86_64) │
│ drivers │ │
│ transport │ aiokafka=1.1.6 │
│ web │ aiohttp=3.6.2 │
│ datadir │ /path/to/project/horton-data │
│ appdir │ /path/to/project/horton-data/v1 │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...
┌Topic Partition Set─────────┬────────────┐
│ topic │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities │ {0-7} │
│ horton-__assignor-__leader │ {0} │
└────────────────────────────┴────────────┘ Воно живе!
Подивимося на partition set. Як ми бачимо, був створений топік з ім'ям, яке ми позначили в коді, кількість партицій дефолтна (8, взята з - Параметра об'єкта докладання), так як у нашого топіка ми індивідуальне значення (через partitions) не вказували. Запущеному агенту у воркері відведено всі 8 партицицій, оскільки він єдиний, але про це буде детальніше щодо кластерингу.
Що ж, тепер можемо зайти в інше вікно терміналу і відправити порожнє повідомлення до нашого топіка:
> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}PS за допомогою @ ми показуємо, що надсилаємо повідомлення в топік з ім'ям «collect_securities».
В даному випадку, повідомлення пішло в 6 партицію - це можна перевірити, зайшовши в kafdrop localhost:9000
Перейшовши у вікно терміналу з нашим воркером, ми побачимо радісне повідомлення надіслане за допомогою loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securitiesТакож, можемо заглянути в mongo (за допомогою Robo3T або Studio3T) і побачити, що цінні папери в базі:
Я не мільярдер, а тому задовольняємося першим варіантом перегляду.
Щастя та радість - перший агент готовий 🙂
Агент готовий, нехай живе новий агент!
Так, панове, нами пройдено лише 1/3 шляху, приготованого цією статтею, але не сумуйте, тому що зараз буде вже легше.
Отже, тепер нам потрібен агент, який збирає інформацію і складає її в документ колекції:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...Так як цей агент оброблятиме інформацію про конкретну security, нам потрібно в повідомленні вказати тикер цього паперу. Для цього у faust існують - Класи, що декларують схему повідомлення в топіці агента.
У такому разі перейдемо в і опишемо, як має виглядати повідомлення у цього топіка:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Як ви вже могли здогадатися, faust для опису схеми повідомлення використовує анотацію типів у python, тому й мінімальна версія, яку підтримує бібліотека. .
Повернемося до агента, встановимо типи та допишемо його:
collect_security_overview_topic = app.topic(
"collect_security_overview", internal=True, value_type=CollectSecurityOverview
)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
logger.info(
"Start collect security [{symbol}] overview", symbol=event.symbol
)
client = AlphaVantageClient(session, API_KEY)
security_overview = await client.get_security_overview(event.symbol)
await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)
yield True
Як бачите, ми передаємо метод ініціалізації топіка новий параметр зі схемою — value_type. Далі, все за тією ж схемою, тому зупинятися на чомусь ще — сенсу не бачу.
Ну що ж, останній штрих — додамо в collect_securitites виклик агента збору інформації:
....
for security in securities:
await SecurityCRUD.update_one({
"symbol": security["symbol"],
"exchange": security["exchange"]
},
security,
upsert = True,
)
await collect_security_overview.cast(
CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
)
....Використовуємо оголошення раніше схему для повідомлення. В даному випадку, я використовував метод .cast, тому що нам не потрібно очікувати результату від агента, але варто згадати, що надіслати повідомлення в топік:
cast — не блокує, оскільки не очікує на результат. Не можна надіслати результат в інший топік повідомленням.
send - не блокує, тому що не очікує результату. Можна вказати агента в топік якого піде результат.
ask - очікує на результат. Можна вказати агента в топік якого піде результат.
Отже, на цьому з агентами сьогодні все!
Команда мрії
Останнє, що я обіцяв написати в цій частині команди. Як уже говорилося раніше, команди faust - це обгортка над click. Фактично faust просто приєднує нашу кастомну команду до свого інтерфейсу за вказівкою ключа -A
Після оголошених агентів у додамо функцію з декоратором app.command, що викликає метод кинути у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Таким чином, якщо ми викличемо список команд, у ньому буде і наша нова команда:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Нею ми можемо скористатися як будь-який інший, тому перезапустимо faust воркер і почнемо повноцінний збір цінних паперів:
> faust -A horton.agents start-collect-securitiesЩо буде далі?
У наступній частині ми, на прикладі агентів, що залишилися, розглянемо, механізм sink для пошуку екстремум у цінах закриття торгів за рік і cron-запуск агентів.
На сьогодні все! Дякую за прочитання 🙂
PS Під минулою частиною мене запитали про faust та confluent kafka (). Здається, що confluent багато в чому функціональніший, але річ у тому, що faust не має повноцінної підтримки клієнта для confluent — це випливає з.
Джерело: habr.com
