Дык вось, другая частка. Як і пісалася раней, у ёй мы зробім наступнае:
Напішам невялікі кліентык для alphavantage на aiohttp з запытамі на патрэбныя нам эндпаінты.
Зробім агента, які будзе збіраць дадзеныя аб каштоўных паперах і мэта інфармацыю па іх.
Але, гэта тое, што мы зробім для самога праекту, а ў плане даследавання faust мы даведаемся, як пісаць агентаў, якія апрацоўваюць стрым падзей з kafka, а гэтак жа як напісаць каманды (абгортка на click), у нашым выпадкі – для ручнога пуша паведамлення у топік, за якім сочыць агент.
Падрыхтоўка
Кліент AlphaVantage
Для пачатку, напішам невялікі aiohttp кліентык для запытаў на alphavantage.
API AlphaVantage дастаткова проста і прыгожа спраектавана, таму ўсе запыты я вырашыў праводзіць праз метад construct_query дзе ў сваю чаргу ідзе http выклік.
Усе палі я прыводжу да snake_case для зручнасці.
Ну і дэкарацыя logger.catch для прыгожай і інфарматыўнай высновы трэйсбека.
PS Незабыўны лакальна дадаць токен alphavantage у config.yml, альбо экспартаваць зменную асяроддзі HORTON_SERVICE_APIKEY. Атрымліваем токен тут.
CRUD-клас
У нас будзе калекцыя securities для захоўвання мета інфармацыі аб каштоўных паперах.
Пакуль у нас будзе самае простае стварэнне прыкладання, крыху пазней мы яго пашырым, аднак, каб не прымушаць вас чакаць, вось рэферэнсы на App-клас. На клас settings таксама раю зірнуць, бо менавіта ён адказвае за большую частку налад.
Так, спачатку атрымліваем аб'ект faust-прыкладанні - гэта досыць проста. Далей, мы відавочна аб'яўляем топік для нашага агента… Тут варта згадаць, што гэта такое, што за параметр internal і як гэта можна задаволіць па-іншаму.
Топікі ў kafka, калі мы хочам даведацца дакладнае вызначэнне, то лепш прачытаць оф. доку, альбо можна прачытаць кампендыум на хабры на рускай, дзе гэтак жа ўсё дастаткова дакладна адлюстравана 🙂
Параметр internal, досыць добра апісаны ў доку faust, дазваляе нам наладжваць топік прама ў кодзе, натуральна, маюцца на ўвазе параметры, прадугледжаныя распрацоўшчыкамі faust, напрыклад: retention, retention policy (па-змаўчанні delete, але можна ўсталяваць і кампактны), кол-ць партыцый на топік (мноства, каб зрабіць, напрыклад, меншае чым глабальнае значэнне прыкладанні faust).
Наогул, агент можа ствараць сам кіраваны топік з глабальнымі значэннямі, аднак, я кахаю аб'яўляць усё відавочна. Да таго ж, некаторыя параметры (напрыклад, кол-ць партыцый ці retention policy) топіка ў аб'яве агента наладзіць нельга.
Вось як гэта магло было выглядаць без ручнога азначэння топіка:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Такім чынам, у пачатку агента мы адчыняем aiohttp сесію для запытаў праз наш кліент. Такім чынам, пры запуску воркера, калі будзе запушчаны наш агент, адразу ж будзе адкрыта сесія - адна, на ўвесь час працы воркера (ці некалькі, калі змяніць параметр concurrency у агента з дэфолтнай адзінкі).
Далей, мы ідзем па стрыму (паведамленне мы змяшчаем у _, так як нам, у дадзеным агенце, абыякава змест) паведамленняў з нашага топіка, калі яны ёсць пры бягучым зруху (offset), інакш, наш цыкл будзе чакаць іх паступлення. Ну а ўсярэдзіне нашага цыклу, мы лагуем паступленне паведамлення, атрымліваем спіс актыўных (get_securities вяртае па-змаўчанні толькі active, гл. код кліента) каштоўных папер і захоўваем яго ў базу, правяраючы пры гэтым, ці ёсць папера з такім Біржавы сімвал і біржай у БД , калі ёсць, то яна (папера) проста абновіцца.
Запусцім наша тварэнне!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Магчымасці вэб-кампанента faust я разглядаць у артыкулах не буду, таму выстаўляемы адпаведны сцяг.
У нашай камандзе запуску мы паказалі faust'у, дзе шукаць аб'ект прыкладання і што рабіць з ім (запусціць воркер) з узроўнем вываду логаў info. Атрымліваем наступную выснову:
Паглядзім на partition set. Як мы бачым, быў створаны топік з імем, якое мы пазначылі ў кодзе, у партый дэфолтнае (8, узятае з topic_partitions - Параметра аб'екта прыкладання), так як у нашага топіка мы індывідуальнае значэнне (праз partitions) не паказвалі. Запушчанаму агенту ў воркеры адведзены ўсе 8 партыцыцый, бо ён адзіны, але пра гэта будзе падрабязней у частцы пра кластэрынг.
Што ж, зараз можам зайсці ў іншае акно тэрмінала і адправіць пустое паведамленне ў наш топік:
PS з дапамогай @ мы паказваем, што дасылаем паведамленне ў топік з імем «collect_securities».
У дадзеным выпадку, паведамленне сышло ў 6 партыцыю - гэта можна праверыць, зайшоўшы ў kafdrop на localhost:9000
Пяройдучы ў акно тэрмінала з нашым воркерам, мы ўбачым радаснае паведамленне, дасланае з дапамогай loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Гэтак жа, можам зазірнуць у mongo (з дапамогай Robo3T ці Studio3T) і ўбачыць, што каштоўныя паперы ў базе:
Я не мільярдэр, а таму, здавольваемся першым варыянтам прагляду.
Шчасце і радасць - першы агент гатовы 🙂
Агент гатовы, ды жыве новы агент!
Так, спадары, намі пройдзена толькі 1/3 шляху, прыгатаванага гэтым артыкулам, але не падайце духам, бо цяпер будзе ўжо лягчэй.
Такім чынам, зараз нам патрэбен агент, які збірае мета інфармацыю і складае яе ў дакумент калекцыі:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Так як гэты агент будзе апрацоўваць інфармацыю аб канкрэтнай security, нам трэба ў паведамленні ўказаць Біржавы сімвал (symbol) гэтай паперы. Для гэтага ў faust існуюць Ўлік - Класы, якія дэкларуюць схему паведамлення ў топіцы агента.
У такім выпадку пяройдзем у records.pyі апішам, як павінна выглядаць паведамленне ў гэтага топіка:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Як вы ўжо маглі здагадацца, faust для апісання схемы паведамлення выкарыстоўвае анатацыю тыпаў у python, таму і мінімальная версія, якая падтрымліваецца бібліятэкай. 3.6.
Як бачыце, мы перадаем у метад ініцыялізацыі топіка новы параметр са схемай - value_type. Далей, усё па той жа самай схеме, таму спыняцца на чым тое яшчэ - сэнсу не бачу.
Ну што ж, апошні штрых - дадамо ў collect_securitites выклік агента збору мета інфармацыі:
Выкарыстоўваны раней аб'яве схему для паведамлення. У дадзеным выпадку, я выкарыстаў метад .cast, бо нам не трэба чакаць вынік ад агента, але варта згадаць, што спосабаў паслаць паведамленне ў топік:
cast - не блакуе, так як не чакае выніку. Нельга даслаць вынік у іншы топік паведамленнем.
send - не блакуе, так як не чакае выніку. Можна паказаць агента ў топік якога сыдзе вынік.
ask - чакае выніку. Можна паказаць агента ў топік якога сыдзе вынік.
Дык вось, на гэтым з агентамі на сёння ўсё!
Каманда мары
Апошняе, што я абяцаў напісаць у гэтай частцы - каманды. Як ужо гаварылася раней, каманды ў faust - гэта абгортка над click. Фактычна faust проста далучае нашу кастамную каманду да свайго інтэрфейсу пры ўказанні ключа -A
Пасля абвешчаных агентаў у agents.py дадамо функцыю з дэкаратарам app.command, якая выклікае метад кінуць у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Такім чынам, калі мы выклічам спіс каманд, у ім будзе і наша новая каманда:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Ёю мы можам скарыстацца, як любы іншы, таму перазапусцім faust воркер і пачнем паўнавартасны збор каштоўных папер:
> faust -A horton.agents start-collect-securities
Што будзе далей?
У наступнай частцы мы, на прыкладзе пакінутых агентаў, разгледзім, механізм sink для пошуку экстрэмуму ў коштах зачынення таргоў за год і cron-запуск агентаў.
PS Пад мінулай часткай мяне спыталі пра faust і confluent kafka (якія ёсць у confluent фічы). Здаецца, што confluent шмат у чым функцыянальней, але справа ў тым, што faust не мае паўнавартаснай падтрымкі кліента для confluent – гэта вынікае з апісання абмежаванняў кліентаў у доку.