Итак-итак, вторая часть. Как и писалось ранее, в ней мы сделаем следующее:
Напишем небольшой клиентик для alphavantage на aiohttp с запросами на нужные нам эндпоинты.
Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.
Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи — для ручного пуша сообщения в топик, за которым следит агент.
Подготовка
Клиент AlphaVantage
Для начала, напишем небольшой aiohttp клиентик для запросов на alphavantage.
API AlphaVantage достаточно просто и красиво спроектирована, поэтому все запросы я решил проводить через метод construct_query где в свою очередь идёт http вызов.
Все поля я привожу к snake_case для удобства.
Ну и декорация logger.catch для красивого и информативного вывода трейсбека.
P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен тут.
CRUD-класс
У нас будет коллекция securities для хранения мета информации о ценных бумагах.
Пока у нас будет самое простое создание приложения, чуть позже мы его расширим, однако, чтобы не заставлять вас ждать, вот референсы на App-класс. На класс settings тоже советую взглянуть, так как именно он отвечает за большую часть настроек.
Так, сначала получаем объект faust-приложения — это достаточно просто. Далее, мы явно объявляем топик для нашего агента… Тут стоит упомянуть, что это такое, что за параметр internal и как это можно устроить по-другому.
Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать офф. доку, либо можно прочитать конспект на хабре на русском, где так же всё достаточно точно отражено 🙂
Параметр internal, достаточно хорошо описанный в доке faust, позволяет нам настраивать топик прямо в коде, естественно, имеются ввиду параметры, предусмотренные разработчиками faust, например: retention, retention policy (по-умолчанию delete, но можно установить и compact), кол-во партиций на топик (partitions, чтобы сделать, например, меньшее чем глобальное значение приложения faust).
Вообще, агент может создавать сам управляемый топик с глобальными значениями, однако, я люблю объявлять всё явно. К тому же, некоторые параметры (например, кол-во партиций или retention policy) топика в объявлении агента настроить нельзя.
Вот как это могло было выглядеть без ручного определения топика:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия — одна, на всё время работы воркера (или несколько, если изменить параметр concurrency у агента с дефолтной единички).
Далее, мы идём по стриму (сообщение мы помещаем в _, так как нам, в данном агенте, безразлично содержание) сообщений из нашего топика, если они есть при текущем сдвиге (offset), иначе, наш цикл будет ожидать их поступления. Ну а внутри нашего цикла, мы логируем поступление сообщения, получаем список активных (get_securities возвращает по-умолчания только active, см. код клиента) ценных бумаг и сохраняем его в базу, проверяя при этом, есть ли бумага с таким тикером и биржей в БД, если есть, то она (бумага) просто обновится.
Запустим наше творение!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
P.S. Возможности веб-компонента faust я рассматривать в статьях не буду, поэтому выставляем соответствующий флаг.
В нашей команде запуска мы указали faust’у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:
Посмотрим на partition set. Как мы видим, был создан топик с именем, которое мы обозначили в коде, кол-во партиций дефолтное (8, взятое из topic_partitions — параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.
Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:
P.S. с помощью @ мы показываем, что посылаем сообщение в топик с именем «collect_securities».
В данном случае, сообщение ушло в 6 партицию — это можно проверить, зайдя в kafdrop на localhost:9000
Перейдя в окно терминала с нашим воркером, мы увидим радостное сообщение, посланное с помощью loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:
Я не миллиардер, а потому, довольствуемся первым вариантом просмотра.
Счастье и радость — первый агент готов 🙂
Агент готов, да здравствует новый агент!
Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.
Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records — классы, декларирующие схему сообщения в топике агента.
В таком случае перейдём в records.pyи опишем, как должно выглядеть сообщение у этого топика:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой — 3.6.
Как видите, мы передаём в метод инициализации топика новый параметр со схемой — value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё — смысла не вижу.
Ну что же, последний штрих — добавим в collect_securitites вызов агента сбора мета информации:
Используем ранее объявлению схему для сообщения. В данном случае, я использовал метод .cast, так как нам не нужно ожидать результат от агента, но стоит упомянуть, что способов послать сообщение в топик:
cast — не блокирует, так как не ожидает результата. Нельзя послать результат в другой топик сообщением.
send — не блокирует, так как не ожидает результата. Можно указать агента в топик которого уйдёт результат.
ask — ожидает результата. Можно указать агента в топик которого уйдёт результат.
Итак, на этом с агентами на сегодня всё!
Команда мечты
Последнее, что я обещал написать в этой части — команды. Как уже говорилось ранее, команды в faust — это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A
После объявленных агентов в agents.py добавим функцию с декоратором app.command, вызывающую метод cast у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Ею мы можем воспользоваться, как любой другой, поэтому перезапустим faust воркер и начнём полноценный сбор ценных бумаг:
> faust -A horton.agents start-collect-securities
Что будет дальше?
В следующей части мы, на примере оставшихся агентов, рассмотрим, механизм sink для поиска экстремум в ценах закрытия торгов за год и cron-запуск агентов.
P.S. Под прошлой частью меня спросили про faust и confluent kafka (какие есть у confluent фичи). Кажется, что confluent во многом функциональнее, но дело в том, что faust не имеет полноценной поддержки клиента для confluent — это следует из описания ограничений клиентов в доке.