Така, така, втората част. Както беше написано по-рано, в него ще направим следното:
Нека напишем малък клиент за alphavantage на aiohttp със заявки за крайните точки, от които се нуждаем.
Нека създадем агент, който ще събира данни за ценни книжа и мета информация за тях.
Но това е, което ще направим за самия проект и по отношение на изследването на faust, ще се научим как да пишем агенти, които обработват събития от kafka, както и как да пишем команди (click wrapper), в нашия случай - за ръчно насочени съобщения към темата, която агентът наблюдава.
Обучение
AlphaVantage клиент
Първо, нека напишем малък aiohttp клиент за заявки към alphavantage.
AlphaVantage API е доста прост и красиво проектиран, така че реших да направя всички заявки чрез метода construct_query където на свой ред има http повикване.
Нося всички полета snake_case за комфорт.
Е, декорацията logger.catch за красив и информативен изход за проследяване.
PS Не забравяйте да добавите маркера alphavantage локално към config.yml или да експортирате променливата на средата HORTON_SERVICE_APIKEY. Получаваме жетон тук.
CRUD клас
Ще имаме колекция от ценни книжа, за да съхраняваме мета информация за ценни книжа.
Засега ще имаме най-простото създаване на приложение, малко по-късно ще го разширим, но за да не ви караме да чакате, тук препратки към App-клас. Също така ви съветвам да разгледате класа на настройките, тъй като той отговаря за повечето настройки.
Основна част
Агент за събиране и поддържане на списък с ценни книжа
И така, първо получаваме обекта на приложението faust - това е доста просто. След това изрично декларираме тема за нашия агент... Тук си струва да споменем какво представлява, какъв е вътрешният параметър и как това може да бъде подредено по различен начин.
Темите в kafka, ако искаме да знаем точната дефиниция, по-добре е да прочетем изключено. документ, или можете да прочетете абстрактен на хъба на руски, където всичко също е отразено доста точно :)
Параметър вътрешен, описан доста добре в faust doc, ни позволява да конфигурираме темата директно в кода, разбира се, това означава параметрите, предоставени от разработчиците на faust, например: задържане, политика за задържане (по подразбиране изтриване, но можете да зададете компактен), брой дялове на тема (преградида направите, например, по-малко от световно значение приложения faust).
По принцип агентът може да създаде управлявана тема с глобални стойности, но обичам да декларирам всичко изрично. Освен това някои параметри (например броя на дяловете или политиката за задържане) на темата в рекламата на агента не могат да бъдат конфигурирани.
Ето как може да изглежда без ръчно дефиниране на темата:
Е, сега нека опишем какво ще направи нашият агент :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
И така, в началото на агента отваряме aiohttp сесия за заявки през нашия клиент. Така при стартиране на воркър, когато се стартира нашият агент, веднага ще се отвори сесия - една, за цялото време, докато работи воркърът (или няколко, ако промените параметъра едновременност от агент с единица по подразбиране).
След това следваме потока (поставяме съобщението в _, тъй като ние, в този агент, не се интересуваме от съдържанието) на съобщенията от нашата тема, ако те съществуват в текущото отместване, в противен случай нашият цикъл ще изчака пристигането им. Е, вътре в нашия цикъл регистрираме получаването на съобщението, получаваме списък с активни (get_securities връща само активни по подразбиране, вижте клиентския код) ценни книжа и го записваме в базата данни, като проверяваме дали има ценна книга със същия тикер и обмен в базата данни, ако има, тогава тя (хартията) просто ще бъде актуализирана.
Нека стартираме нашето творение!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Характеристики уеб компонент Няма да разглеждам фауст в статиите, така че поставихме съответния флаг.
В нашата команда за стартиране казахме на faust къде да търси обекта на приложението и какво да прави с него (стартира работно средство) с изходното ниво на информационния журнал. Получаваме следния изход:
Нека разгледаме комплекта дялове. Както виждаме, беше създадена тема с името, което посочихме в кода, броят дялове по подразбиране (8, взето от topic_partitions - параметър на обект на приложение), тъй като не сме посочили индивидуална стойност за нашата тема (чрез дялове). На стартирания агент в работника се присвояват всичките 8 дяла, тъй като той е единственият, но това ще бъде обсъдено по-подробно в частта за клъстерирането.
Е, сега можем да отидем до друг терминален прозорец и да изпратим празно съобщение до нашата тема:
PS използване @ показваме, че изпращаме съобщение до тема с име „collect_securities“.
В този случай съобщението отиде в дял 6 - можете да проверите това, като отидете на kafdrop on localhost:9000
Отивайки до прозореца на терминала с нашия работник, ще видим щастливо съобщение, изпратено чрез loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Можем също да разгледаме mongo (използвайки Robo3T или Studio3T) и да видим, че ценните книжа са в базата данни:
Аз не съм милиардер и затова се задоволяваме с първата опция за гледане.
Щастие и радост - първият агент е готов :)
Агент готов, да живее новият агент!
Да, господа, изминахме само 1/3 от пътя, подготвен от тази статия, но не се обезсърчавайте, защото сега ще бъде по-лесно.
Така че сега имаме нужда от агент, който събира мета информация и я поставя в документ за събиране:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Тъй като този агент ще обработва информация за конкретна ценна книга, трябва да посочим тикера (символа) на тази ценна книга в съобщението. За тази цел във фауст има Records — класове, които декларират схемата на съобщението в темата на агента.
В този случай нека да отидем на records.pyи опишете как трябва да изглежда съобщението за тази тема:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Както може би се досещате, faust използва анотацията тип python, за да опише схемата на съобщението, поради което минималната версия, поддържана от библиотеката, е 3.6.
Да се върнем към агента, да зададем типовете и да го добавим:
Както можете да видите, предаваме нов параметър със схема към метода за инициализация на темата - value_type. Освен това всичко следва същата схема, така че не виждам смисъл да се спирам на нещо друго.
Е, последният щрих е да добавите извикване към агента за събиране на мета информация към collect_securitites:
Използваме предварително обявената схема за съобщение. В този случай използвах метода .cast, тъй като не е необходимо да чакаме резултата от агента, но си струва да споменем, че начини изпрати съобщение до темата:
cast - не блокира, защото не очаква резултат. Не можете да изпратите резултата в друга тема като съобщение.
изпрати - не блокира, защото не очаква резултат. Можете да посочите агент в темата, към която да отиде резултатът.
питай - чака резултат. Можете да посочите агент в темата, към която да отиде резултатът.
И така, това е всичко с агентите за днес!
Мечтаният отбор
Последното нещо, което обещах да напиша в тази част са командите. Както споменахме по-рано, командите във faust са обвивка около кликване. Всъщност faust просто прикачва нашата персонализирана команда към своя интерфейс, когато указва ключа -A
След обявените агенти в агенти.py добавете функция с декоратор app.commandизвикване на метода хвърлиха у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Така, ако извикаме списъка с команди, нашата нова команда ще бъде в него:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Можем да го използваме като всеки друг, така че нека рестартираме faust worker и да започнем пълноценно събиране на ценни книжа:
> faust -A horton.agents start-collect-securities
Какво ще стане след това?
В следващата част, като използваме останалите агенти като пример, ще разгледаме механизма за потъване за търсене на крайности в цените на затваряне на търговията за годината и стартирането на cron на агенти.