Shunday qilib, ikkinchi qism. Yuqorida yozilganidek, unda biz quyidagilarni qilamiz:
Bizga kerakli so'nggi nuqtalar uchun so'rovlar bilan aiohttp-da alfavantage uchun kichik mijoz yozamiz.
Keling, qimmatli qog'ozlar va ular bo'yicha meta-ma'lumotlar to'g'risidagi ma'lumotlarni to'playdigan agent yarataylik.
Ammo, biz loyihaning o'zi uchun nima qilamiz va faust tadqiqoti nuqtai nazaridan, biz kafkadan oqim hodisalarini qayta ishlaydigan agentlarni qanday yozishni, shuningdek buyruqlarni qanday yozishni (o'rashni bosish), bizning holatlarimizda - agent kuzatayotgan mavzuga qo'lda surish xabarlari uchun.
o'quv
AlphaVantage mijozi
Birinchidan, alfavantga so'rovlar uchun kichik aiohttp mijozini yozamiz.
AlphaVantage API juda sodda va chiroyli tarzda ishlab chiqilgan, shuning uchun men barcha so'rovlarni ushbu usul orqali amalga oshirishga qaror qildim construct_query bu erda o'z navbatida http chaqiruvi mavjud.
Men barcha dalalarni olib kelaman snake_case qulaylik uchun.
Xo'sh, chiroyli va ma'lumot beruvchi traceback chiqishi uchun logger.catch bezak.
P.S. Alfavantage tokenini mahalliy ravishda config.yml ga qo'shishni yoki muhit o'zgaruvchisini eksport qilishni unutmang. HORTON_SERVICE_APIKEY. Biz token olamiz shu yerda.
CRUD klassi
Qimmatli qog'ozlar haqidagi meta-ma'lumotni saqlash uchun qimmatli qog'ozlar to'plamiga ega bo'lamiz.
Hozircha bizda eng oddiy dastur yaratish bo'ladi, birozdan keyin uni kengaytiramiz, ammo sizni kuttirmaslik uchun bu erda havolalar Ilova sinfiga. Shuningdek, sozlamalar sinfini ko'rib chiqishni maslahat beraman, chunki u ko'pgina sozlamalar uchun javobgardir.
Asosiy qism
Qimmatli qog'ozlar ro'yxatini yig'ish va yuritish bo'yicha agent
Shunday qilib, birinchi navbatda biz faust dastur ob'ektini olamiz - bu juda oddiy. Keyinchalik, biz agentimiz uchun mavzuni aniq e'lon qilamiz ... Bu erda nima ekanligini, ichki parametr nima ekanligini va buni qanday qilib boshqacha tartibga solish mumkinligini eslatib o'tish kerak.
Kafkadagi mavzular, agar aniq ta'rifni bilmoqchi bo'lsak, o'qiganimiz ma'qul o'chirilgan. hujjat, yoki siz o'qishingiz mumkin kompendium Habré-da rus tilida, u erda hamma narsa juda aniq aks ettirilgan :)
Ichki parametr, faust doc-da juda yaxshi tasvirlangan, mavzuni to'g'ridan-to'g'ri kodda sozlash imkonini beradi, albatta, bu faust ishlab chiquvchilari tomonidan taqdim etilgan parametrlarni anglatadi, masalan: saqlash, saqlash siyosati (odatda o'chirish, lekin siz o'rnatishingiz mumkin) zich), mavzu bo'yicha bo'limlar soni (qismlarmasalan, kamroq qilish global ahamiyatga ega ilovalar faust).
Umuman olganda, agent global qadriyatlar bilan boshqariladigan mavzuni yaratishi mumkin, ammo men hamma narsani aniq e'lon qilishni yaxshi ko'raman. Bundan tashqari, agent reklamasidagi mavzuning ba'zi parametrlarini (masalan, bo'limlar soni yoki saqlash siyosati) sozlab bo'lmaydi.
Mavzuni qo'lda belgilamasdan turib u qanday ko'rinishi mumkin:
Xo'sh, endi agentimiz nima qilishini tasvirlab beraylik :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Shunday qilib, agentning boshida biz mijozimiz orqali so'rovlar uchun aiohttp sessiyasini ochamiz. Shunday qilib, ishchini ishga tushirganda, bizning agentimiz ishga tushirilganda, darhol sessiya ochiladi - bitta, ishchi butun vaqt davomida (yoki bir nechta, agar siz parametrni o'zgartirsangiz). bir vaqtda standart birlikka ega agentdan).
Keyinchalik, biz oqimni kuzatib boramiz (biz xabarni joylashtiramiz _, chunki biz ushbu agentda mavzuimizdagi xabarlarning mazmuni haqida qayg'urmaymiz, agar ular joriy ofsetda mavjud bo'lsa, aks holda bizning tsiklimiz ularning kelishini kutadi. Xo'sh, bizning tsiklimiz ichida biz xabarni qabul qilishni qayd qilamiz, faol (get_securities faqat sukut bo'yicha faol bo'ladi, mijoz kodini ko'ring) ro'yxatini olamiz va uni ma'lumotlar bazasiga saqlaymiz, xuddi shu ticker bilan xavfsizlik mavjudligini tekshiramiz va ma'lumotlar bazasida almashinuv , agar mavjud bo'lsa, u (qog'oz) shunchaki yangilanadi.
Keling, ijodimizni boshlaymiz!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
P.S. Imkoniyatlar veb komponenti Men maqolalarda faustni ko'rib chiqmayman, shuning uchun biz tegishli bayroqni o'rnatdik.
Ishga tushirish buyrug'imizda biz faustga dastur ob'ektini qayerdan qidirishni va u bilan nima qilish kerakligini (ishchini ishga tushirish) ma'lumotlar jurnalining chiqish darajasi bilan aytdik. Biz quyidagi natijani olamiz:
Keling, bo'limlar to'plamini ko'rib chiqaylik. Ko'rib turganimizdek, mavzu biz kodda belgilagan nom bilan yaratilgan, standart bo'limlar soni (8, dan olingan). mavzu_bo'limlari - dastur ob'ekti parametri), chunki biz mavzuimiz uchun individual qiymatni belgilamadik (bo'limlar orqali). Ishchida ishga tushirilgan agentga barcha 8 ta bo'lim tayinlangan, chunki u yagona, ammo bu klasterlash bo'limida batafsilroq muhokama qilinadi.
Xo'sh, endi biz boshqa terminal oynasiga o'tishimiz va mavzuimizga bo'sh xabar yuborishimiz mumkin:
P.S. yordamida @ biz "to'plash_qimmatli qog'ozlar" nomli mavzuga xabar yuborayotganimizni ko'rsatamiz.
Bunday holda, xabar 6-bo'limga o'tdi - buni kafdrop-ga o'tish orqali tekshirishingiz mumkin localhost:9000
Ishchimiz bilan terminal oynasiga borib, loguru yordamida yuborilgan baxtli xabarni ko'ramiz:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Shuningdek, biz mongo-ga (Robo3T yoki Studio3T-dan foydalanib) qarashimiz va qimmatli qog'ozlar ma'lumotlar bazasida ekanligini ko'rishimiz mumkin:
Men milliarder emasman, shuning uchun biz birinchi ko'rish variantidan mamnunmiz.
Baxt va quvonch - birinchi agent tayyor :)
Agent tayyor, yangi agent yashasin!
Ha, janoblar, biz ushbu maqola tomonidan tayyorlangan yo'lning atigi 1/3 qismini bosib o'tdik, ammo tushkunlikka tushmang, chunki endi bu osonroq bo'ladi.
Endi bizga meta-ma'lumotni to'playdigan va uni yig'ish hujjatiga joylashtiradigan agent kerak:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Ushbu agent ma'lum bir qimmatli qog'oz haqidagi ma'lumotlarni qayta ishlaganligi sababli, biz xabarda ushbu qimmatli qog'ozning belgisini (ramzini) ko'rsatishimiz kerak. Buning uchun faustda mavjud Records — agent mavzusida xabarlar sxemasini e'lon qiluvchi sinflar.
Bunday holda, keling records.pyva ushbu mavzu uchun xabar qanday ko'rinishini tasvirlab bering:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Siz taxmin qilganingizdek, faust xabar sxemasini tavsiflash uchun python tipidagi izohdan foydalanadi, shuning uchun kutubxona tomonidan qo'llab-quvvatlanadigan minimal versiya. 3.6.
Keling, agentga qaytaylik, turlarni o'rnatamiz va uni qo'shamiz:
Ko'rib turganingizdek, mavzuni ishga tushirish usuliga sxema bilan yangi parametrni o'tkazamiz - value_type. Bundan tashqari, hamma narsa bir xil sxema bo'yicha amalga oshiriladi, shuning uchun men boshqa narsa haqida o'ylashning ma'nosini ko'rmayapman.
Yakuniy teginish - bu collect_securitites uchun meta-ma'lumot yig'ish agentiga qo'ng'iroq qilish:
Biz xabar uchun avval e'lon qilingan sxemadan foydalanamiz. Bu holatda men .cast usulidan foydalandim, chunki agentdan natija kutishimiz shart emas, lekin shuni ta'kidlash joizki yo'llari mavzuga xabar yuboring:
quyma - bloklanmaydi, chunki u natija kutmaydi. Natijani boshqa mavzuga xabar sifatida yubora olmaysiz.
yuborish - bloklanmaydi, chunki u natija kutmaydi. Natija ketadigan mavzuda agentni belgilashingiz mumkin.
so'rang - natijani kutadi. Natija ketadigan mavzuda agentni belgilashingiz mumkin.
Demak, agentlar bilan bugun hammasi!
Orzular jamoasi
Bu qismda yozishga va'da bergan oxirgi narsa - bu buyruqlar. Yuqorida aytib o'tilganidek, faust-dagi buyruqlar bosish atrofida o'ralgan. Haqiqatan ham, faust -A tugmachasini ko'rsatganda, bizning maxsus buyruqimizni o'z interfeysiga biriktiradi
E'lon qilingan agentlardan keyin agents.py dekorator bilan funksiya qo'shing app.commandusulni chaqirish quyma у yig'ish_xavfsizliklari:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Shunday qilib, agar biz buyruqlar ro'yxatini chaqirsak, unda bizning yangi buyruqimiz bo'ladi:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Biz uni boshqalar kabi ishlatishimiz mumkin, shuning uchun faust ishchisini qayta ishga tushiramiz va qimmatli qog'ozlarning to'liq to'plamini boshlaymiz:
> faust -A horton.agents start-collect-securities
Keyingi nima bo'ladi?
Keyingi qismda, qolgan agentlardan misol sifatida foydalanib, biz yil davomida savdoning yopilish narxlarida ekstremallarni qidirish mexanizmini va agentlarni ishga tushirishni ko'rib chiqamiz.
P.S. Oxirgi qism ostida mendan faust va qo'shilgan kafka haqida so'rashdi (konfluent qanday xususiyatlarga ega?). Ko'rinishidan, konfluent ko'p jihatdan ko'proq funktsionaldir, ammo haqiqat shundaki, faust konfluent uchun mijozlarni to'liq qo'llab-quvvatlamaydi - bu shundan kelib chiqadi. hujjatdagi mijoz cheklovlarining tavsifi.