Отже, друга частина. Як і писалося раніше, у ній ми зробимо таке:
Напишемо невеликий клієнтик для alphavantage на aiohttp із запитами на потрібні нам ендпоінти.
Зробимо агента, який збиратиме дані про цінні папери та мета інформацію щодо них.
Але це те, що ми зробимо для самого проекту, а в плані дослідження faust ми дізнаємося, як писати агентів, які обробляють стрим подій з kafka, а так само як написати команди (обгортка на click), у нашому випадку — для ручного пуша повідомлення у топік, за яким стежить агент.
Підготовка
Клієнт AlphaVantage
Для початку напишемо невеликий aiohttp клієнтик для запитів на alphavantage.
Поки в нас буде найпростіше створення програми, трохи пізніше ми його розширимо, однак, щоб не змушувати вас чекати, ось референси на App-клас. На клас settings теж раджу подивитись, тому що саме він відповідає за більшу частину налаштувань.
Так, спочатку отримуємо об'єкт faust-додатку – це досить просто. Далі, ми явно оголошуємо топік для нашого агента ... Тут варто згадати, що це таке, що за параметр internal і як це можна влаштувати по-іншому.
Топики в kafka, якщо ми хочемо дізнатися точне визначення, краще прочитати оф. доку, або можна прочитати конспект на хабре російською, де так само все досить точно відображено 🙂
Параметр internal, Досить добре описаний в доці faust, дозволяє нам налаштовувати топік прямо в коді, природно, маються на увазі параметри, передбачені розробниками faust, наприклад: retention, retention policy (за замовчуванням delete, але можна встановити і компактний), кількість партицій на топік (безліч, щоб зробити, наприклад, менше ніж глобальне значення програми faust).
Взагалі, агент може створювати сам керований топік з глобальними значеннями, проте, я люблю оголошувати все явно. До того ж, деякі параметри (наприклад, кількість партицій або retention policy) топіка в оголошенні агента налаштувати не можна.
Ось як це могло виглядати без ручного визначення топіка:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Отже, спочатку агента ми відкриваємо aiohttp сесію для запитів через наш клієнт. Таким чином, при запуску воркера, коли буде запущено наш агент, відразу ж буде відкрито сесію — одну, на весь час роботи воркера (або кілька, якщо змінити параметр одночасність у агента з дефолтної одиниці).
Далі, ми йдемо по стриму (повідомлення ми поміщаємо в _, тому що нам, в даному агенті, байдуже зміст) повідомлень з нашого топіка, якщо вони є при поточному зрушенні (offset), інакше, наш цикл чекатиме їх надходження. Ну а всередині нашого циклу, ми логуємо надходження повідомлення, отримуємо список активних цінних паперів і зберігаємо його в базу, перевіряючи при цьому, чи є папір з таким тикером і біржею в БД. якщо є, то вона (папір) просто оновиться.
Запустимо наше творіння!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Можливості веб-компонента faust я розглядати у статтях не буду, тож виставляємо відповідний прапор.
У нашій команді запуску ми вказали faust'у, де шукати об'єкт програми та що робити з ним (запустити воркер) з рівнем виведення логів info. Отримуємо наступний висновок:
Подивимося на partition set. Як ми бачимо, був створений топік з ім'ям, яке ми позначили в коді, кількість партицій дефолтна (8, взята з topic_partitions - Параметра об'єкта докладання), так як у нашого топіка ми індивідуальне значення (через partitions) не вказували. Запущеному агенту у воркері відведено всі 8 партицицій, оскільки він єдиний, але про це буде детальніше щодо кластерингу.
Що ж, тепер можемо зайти в інше вікно терміналу і відправити порожнє повідомлення до нашого топіка:
PS за допомогою @ ми показуємо, що надсилаємо повідомлення в топік з ім'ям «collect_securities».
В даному випадку, повідомлення пішло в 6 партицію - це можна перевірити, зайшовши в kafdrop localhost:9000
Перейшовши у вікно терміналу з нашим воркером, ми побачимо радісне повідомлення надіслане за допомогою loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Також, можемо заглянути в mongo (за допомогою Robo3T або Studio3T) і побачити, що цінні папери в базі:
Я не мільярдер, а тому задовольняємося першим варіантом перегляду.
Щастя та радість - перший агент готовий 🙂
Агент готовий, нехай живе новий агент!
Так, панове, нами пройдено лише 1/3 шляху, приготованого цією статтею, але не сумуйте, тому що зараз буде вже легше.
Отже, тепер нам потрібен агент, який збирає інформацію і складає її в документ колекції:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Так як цей агент оброблятиме інформацію про конкретну security, нам потрібно в повідомленні вказати тикер цього паперу. Для цього у faust існують Records - Класи, що декларують схему повідомлення в топіці агента.
У такому разі перейдемо в records.pyі опишемо, як має виглядати повідомлення у цього топіка:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Як ви вже могли здогадатися, faust для опису схеми повідомлення використовує анотацію типів у python, тому й мінімальна версія, яку підтримує бібліотека. 3.6.
Повернемося до агента, встановимо типи та допишемо його:
Як бачите, ми передаємо метод ініціалізації топіка новий параметр зі схемою — value_type. Далі, все за тією ж схемою, тому зупинятися на чомусь ще — сенсу не бачу.
Ну що ж, останній штрих — додамо в collect_securitites виклик агента збору інформації:
Використовуємо оголошення раніше схему для повідомлення. В даному випадку, я використовував метод .cast, тому що нам не потрібно очікувати результату від агента, але варто згадати, що способів надіслати повідомлення в топік:
cast — не блокує, оскільки не очікує на результат. Не можна надіслати результат в інший топік повідомленням.
send - не блокує, тому що не очікує результату. Можна вказати агента в топік якого піде результат.
ask - очікує на результат. Можна вказати агента в топік якого піде результат.
Отже, на цьому з агентами сьогодні все!
Команда мрії
Останнє, що я обіцяв написати в цій частині команди. Як уже говорилося раніше, команди faust - це обгортка над click. Фактично faust просто приєднує нашу кастомну команду до свого інтерфейсу за вказівкою ключа -A
Після оголошених агентів у agents.py додамо функцію з декоратором app.command, що викликає метод кинути у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Таким чином, якщо ми викличемо список команд, у ньому буде і наша нова команда:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Нею ми можемо скористатися як будь-який інший, тому перезапустимо faust воркер і почнемо повноцінний збір цінних паперів:
> faust -A horton.agents start-collect-securities
Що буде далі?
У наступній частині ми, на прикладі агентів, що залишилися, розглянемо, механізм sink для пошуку екстремум у цінах закриття торгів за рік і cron-запуск агентів.
PS Під минулою частиною мене запитали про faust та confluent kafka (які є у confluent фічі). Здається, що confluent багато в чому функціональніший, але річ у тому, що faust не має повноцінної підтримки клієнта для confluent — це випливає з описи обмежень клієнтів у доку.