Haddaba, qaybta labaad. Sidii hore loogu qoray, waxaan ku samayn doonaa kuwan soo socda:
Aynu ku qorno macmiil yar oo alfavantage ah aiohttp oo ay ku jiraan codsiyada dhamaadka-dhamaadka ee aan u baahanahay.
Сделаем агента, который будет собирать данные о ценных бумагах и мета информацию по ним.
Но, это то, что мы сделаем для самого проекта, а в плане исследования faust мы узнаем, как писать агентов, обрабатывающих стрим событий из kafka, а так же как написать команды (обёртка на click), в нашем случаи — для ручного пуша сообщения в топик, за которым следит агент.
Tababarka
Macmiilka AlphaVantage
Marka hore, aynu u qorno macmiil yar oo aiohttp ah codsiyada alfavantage.
AlphaVantage API si fudud oo qurux badan ayaa loo qaabeeyey, marka waxaan go'aansaday inaan dhammaan codsiyada ku sameeyo habka construct_query где в свою очередь идёт http вызов.
Waxaan keenayaa beeraha oo dhan snake_case для удобства.
Ну и декорация logger.catch для красивого и информативного вывода трейсбека.
P.S. Незабываем локально добавить токен alphavantage в config.yml, либо экспортировать переменную среды HORTON_SERVICE_APIKEY. Получаем токен halkan.
fasalka CRUD
У нас будет коллекция securities для хранения мета информации о ценных бумагах.
Hadda waxaan heli doonaa abuurista codsiga ugu fudud, wax yar ka dib waan ballaarin doonaa, si kastaba ha ahaatee, si aan kuu sugin, halkan референсы App-class. Waxaan sidoo kale kugula talinayaa inaad eegto fasalka dejinta, maadaama ay mas'uul ka tahay inta badan goobaha.
Marka, marka hore waxaan helnaa shayga codsiga degdega ah - waa wax fudud. Marka xigta, waxaan si cad ugu dhawaaqeynaa mawduuc loogu talagalay wakiilkayaga ... Halkan waxaa habboon in la sheego waxa ay tahay, waa maxay qiyaasta gudaha iyo sida tan loo habeyn karo si ka duwan.
Топики в kafka, если мы хотим узнать точное определение, то лучше прочитать off dukumeenti, либо можно прочитать compendium Habré oo Ruush ah, halkaas oo wax waliba si sax ah uga muuqdaan :)
Parameter gudaha, oo si fiican loogu sharraxay dokumentiga faust, wuxuu noo ogolaanayaa inaan mawduuca si toos ah u habeyno koodhka, dabcan, tani waxay ka dhigan tahay cabbirrada ay bixiyaan soo-saareyaasha xun, tusaale ahaan: sii-haynta, siyaasadda sii-haynta (sida caadiga ah tirtir, laakiin waad dejin kartaa isafgaradka), tirada qaybaha mowduuc kasta (qaybood, чтобы сделать, например, меньшее чем глобальное значение приложения faust).
Guud ahaan, wakiilku wuxuu abuuri karaa mawduuc la maareeyay oo leh qiyam caalami ah, si kastaba ha ahaatee, waxaan jeclahay inaan wax walba si cad u sheego. Intaa waxaa dheer, qaar ka mid ah xuduudaha (tusaale, tirada qaybaha ama siyaasadda sii haynta) ee mawduuca ku jira xayeysiiska wakiilka lama habeyn karo.
Waa kuwan sida ay u ekaan karto iyada oo aan gacanta lagu qeexin mawduuca:
Hagaag, hadda aan sharaxno waxa wakiilkeenu samayn doono :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Итак, в начале агента мы открываем aiohttp сессию для запросов через наш клиент. Таким образом, при запуске воркера, когда будет запущен наш агент, сразу же будет открыта сессия — одна, на всё время работы воркера (или несколько, если изменить параметр isku-duwidda у агента с дефолтной единички).
Далее, мы идём по стриму (сообщение мы помещаем в _, maadaama anaga, wakiilkan, aan dan ka lahayn nuxurka) ee fariimaha mawduuceena, haddii ay ku jiraan xajinta hadda, haddii kale wareeggayagu wuxuu sugi doonaa imaatinkooda. Hagaag, gudaha loop-keena, waxaanu galnaa risiidhka fariinta, waxaanu helnaa liis firfircoon (get_securities soo celinta oo kaliya firfircoonida caadiga ah, eeg code-ka macmiilka) oo kaydiya kaydinta xogta, anagoo hubinayna haddii uu jiro ammaan leh calaamad isku mid ah iyo ku beddelashada kaydka xogta , haddii ay jirto, markaa (warqad) si fudud ayaa loo cusboonaysiin doonaa.
Aan bilowno abuurkeena!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Tilmaamaha PS qaybta webka Ma tixgelin doono khaladka maqaallada, markaa waxaan dejineynaa calanka ku habboon.
В нашей команде запуска мы указали faust’у, где искать объект приложения и что делать с ним (запустить воркер) с уровнем вывода логов info. Получаем следующий вывод:
Aynu eegno qaybta qaybinta. Sida aan arki karno, mowduuc ayaa la sameeyay magaca aan ku magacownay koodhka, tirada qaybta caadiga ah (8, laga soo qaatay mawduuc_qodob — параметра объекта приложения), так как у нашего топика мы индивидуальное значение (через partitions) не указывали. Запущенному агенту в воркере отведены все 8 партициций, так как он единственный, но об этом будет подробнее в части про кластеринг.
Что же, теперь можем зайти в другое окно терминала и отправить пустое сообщение в наш топик:
P.S. с помощью @ waxaan tuseynaa inaan fariin u direyno mowduuc la yiraahdo "collect_securities".
Xaaladdan oo kale, fariintu waxay tagtay qaybta 6 - waxaad ku hubin kartaa tan adoo aadaya kafdrop on localhost:9000
Markaan shaqaalahayada la aadno daaqadda terminalka, waxaan arki doonaa fariin farxad leh oo loo diray loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Так же, можем заглянуть в mongo (с помощью Robo3T или Studio3T) и увидеть, что ценные бумаги в базе:
Anigu ma ihi bilyaneer, sidaas darteed waxaan ku qanacsanahay ikhtiyaarka daawashada koowaad.
Счастье и радость — первый агент готов 🙂
Агент готов, да здравствует новый агент!
Да, господа, нами пройдена только 1/3 пути, уготованного этой статьёй, но не унывайте, так как сейчас будет уже легче.
Итак, теперь нам нужен агент, который собирает мета информацию и складывает её в документ коллекции:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Так как этот агент будет обрабатывать информацию о конкретной security, нам нужно в сообщении указать тикер (symbol) этой бумаги. Для этого в faust существуют Records - fasallada ku dhawaaqaya nidaamka fariinta mawduuca wakiilka.
Xaaladdan oo kale, aan aado diiwaanada.pyи опишем, как должно выглядеть сообщение у этого топика:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Как вы уже могли догадаться, faust для описания схемы сообщения использует аннотацию типов в python, поэтому и минимальная версия, поддерживаемая библиотекой — 3.6.
Aan ku soo laabano wakiilka, dejino noocyada oo aan ku darno:
Как видите, мы передаём в метод инициализации топика новый параметр со схемой — value_type. Далее, всё по той же самой схеме, поэтому останавливаться на чём то ещё — смысла не вижу.
Hagaag, taabashada kama dambaysta ahi waa in lagu daro wicitaan wakiilka xog ururinta meta si ay u ururiyaan_securitites:
Waxaan fariinta u isticmaalnaa nidaamka hore loo sheegay. Xaaladdan oo kale, waxaan isticmaalay habka .cast tan iyo markii aan u baahnayn inaan sugno natiijada wakiilka, laakiin waxaa habboon in la sheego taas siyaabo fariin u dir mawduuca:
kabka - ma xannibo sababtoo ah ma filayo natiijo. Uma diri kartid natiijada mawduuc kale fariin ahaan.
dir - ma xannibo sababtoo ah ma filayo natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.
weydii - waxay sugaysaa natiijo. Waxaad ku qeexi kartaa wakiilka mawduuca ay natiijadu aadi doonto.
Итак, на этом с агентами на сегодня всё!
Kooxda riyada
Последнее, что я обещал написать в этой части — команды. Как уже говорилось ранее, команды в faust — это обёртка над click. Фактически faust просто присоединяет нашу кастомную команду к своему интерфейсу при указании ключа -A
Ka dib markii wakiilada lagu dhawaaqay in wakiilada.py ku dar shaqo leh qurxin app.command, вызывающую метод tuuro у ururin_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Таким образом, если мы вызовем список команд, в нём будет и наша новая команда:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Waxaan u isticmaali karnaa sida qof kasta oo kale, markaa aan dib u bilowno shaqaalaha khaldan oo aan bilowno ururinta dammaanadda oo dhammaystiran:
> faust -A horton.agents start-collect-securities
Maxaa xigi doona?
Qaybta soo socota, annagoo adeegsanayna wakiillada soo hadhay tusaale ahaan, waxaynu tixgelin doonaa habka saxanka ee lagu raadinayo meelaha ugu daran ee qiimaha xidhitaanka ganacsiga ee sanadka iyo furitaanka wakiilada.