Beraz, beraz, bigarren zatia. Lehen idatzi bezala, bertan honako hau egingo dugu:
Idatzi dezagun bezero txiki bat aiohttp-n alphaavantagerako, behar ditugun amaierako puntuen eskaerarekin.
Sortu dezagun baloreei buruzko datuak eta haiei buruzko meta informazioa bilduko dituen agente bat.
Baina, hau da proiektuarentzat berarentzat egingo duguna, eta faust ikerketari dagokionez, kafkatik korronte-gertaerak prozesatzen dituzten agenteak nola idazten ikasiko dugu, baita komandoak nola idatzi (click wrapper), gure kasuan - agenteak kontrolatzen duen gaiari eskuzko push mezuetarako.
Prestakuntza
AlphaVantage bezeroa
Lehenik eta behin, idatz dezagun aiohttp bezero txiki bat alphaavantage eskaeretarako.
AlphaVantage APIa nahiko sinplea eta ederki diseinatuta dago, beraz, eskaera guztiak metodoaren bidez egitea erabaki nuen construct_query non aldi berean http dei bat dagoen.
Soro guztiak ekartzen ditut snake_case erosotasunerako.
Beno, logger.catch dekorazioa trazeback irteera eder eta informagarrirako.
P.S. Ez ahaztu alphaavantage tokena lokalean gehitzea config.yml-era, edo ingurune-aldagaia esportatzea HORTON_SERVICE_APIKEY. Token bat jasotzen dugu Hemen.
CRUD klasea
Baloreen bilduma bat izango dugu baloreei buruzko meta informazioa gordetzeko.
Oraingoz aplikazio sorrera errazena izango dugu, pixka bat geroago zabalduko dugu, hala ere, itxaroten ez jarraitzeko, hemen erreferentziak App-classera. Ezarpenen klaseari begirada bat ematea ere gomendatzen dizut, ezarpen gehienen arduraduna baita.
РћСЃРЅРѕРІРЅР ° С ° С ° Р ° СЃС‚С °
Baloreen zerrenda biltzeko eta mantentzeko agentea
Beraz, lehenik faust aplikazio objektua lortuko dugu - nahiko erraza da. Jarraian, gure agenteari gai bat esplizituki deklaratzen diogu... Hemen aipatzekoa da zer den, zein den barne-parametroa eta nola antola daitekeen hau desberdin.
Kafkan gaiak, definizio zehatza jakin nahi badugu, hobe irakurtzea itzali. dokumentua, edo irakurri dezakezu konpendioa Habré-n errusieraz, non dena ere nahiko zehatz islatzen den :)
Barneko parametroa, nahiko ondo deskribatuta faust dok, gaia zuzenean konfiguratzeko aukera ematen digu kodean, noski, horrek esan nahi du faust garatzaileek emandako parametroak, adibidez: atxikipena, atxikipen politika (lehenespenez ezabatu, baina ezarri dezakezu. trinkoa), gai bakoitzeko partizio kopurua (puntuazioakegiteko, adibidez, baino gutxiago esangura globala aplikazioak faust).
Oro har, agenteak kudeatutako gai bat sor dezake balio globalekin, hala ere, dena esplizituki deklaratzea gustatzen zait. Gainera, agentearen iragarkiko gaiaren parametro batzuk (adibidez, partizio kopurua edo atxikipen-politika) ezin dira konfiguratu.
Hona hemen gaia eskuz definitu gabe nolakoa izan daitekeen:
Beno, orain deskriba dezagun zer egingo duen gure agenteak :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Beraz, agentearen hasieran, aiohttp saio bat irekitzen dugu gure bezeroaren bidez eskaerak egiteko. Horrela, langile bat abiaraztean, gure agentea abiarazten denean, saio bat berehala irekiko da - bat, langilea martxan dagoen denbora osoan (edo hainbat, parametroa aldatzen baduzu konkurrentzia unitate lehenetsia duen agente batetik).
Jarraian, korrontea jarraitzen dugu (mezua jartzen dugu _, guri, agente honetan, ez zaigulako axola gure gaiko mezuen edukia) uneko desplazamenduan existitzen badira, bestela gure zikloa noiz iritsiko zain egongo da. Beno, gure begizta barruan, mezuaren jasotzea erregistratzen dugu, aktiboen zerrenda bat lortuko dugu (get_securities bakarrik aktibo itzultzen lehenespenez, ikusi bezeroaren kodea) eta datu-basean gordetzen dugu, ticker berdina duen segurtasunik dagoen egiaztatuz eta trukea datu-basean, baldin badago, orduan (papera) besterik gabe eguneratuko da.
Abiarazi dezagun gure sorkuntza!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
P.S. Aukerak web osagaia Artikuluetan ez dut faust kontuan hartuko, beraz, dagokion bandera ezarri dugu.
Gure abiarazteko komandoan, Faust-i esan genion non bilatu behar den aplikazioaren objektua eta zer egin harekin (langile bat abiarazi) info log irteera mailarekin. Irteera hau lortzen dugu:
Ikus dezagun partizio multzoa. Ikus dezakegunez, gai bat sortu zen kodean izendatu genuen izenarekin, partizio-kopuru lehenetsiarekin (8, gaia_partizioak - aplikazioaren objektuaren parametroa), ez baitugu balio indibidual bat zehaztu gure gairako (partizioen bidez). Langilean abiarazitako agenteari 8 partizio guztiak esleitzen zaizkio, bakarra baita, baina hori zehatzago eztabaidatuko da clustering-aren atalean.
Beno, orain beste terminal leiho batera joan gaitezke eta mezu huts bat bidali gure gaiari:
P.S. erabiliz @ mezu bat bidaltzen ari garela erakusten dugu “bildu_segurtasunak” izeneko gai bati.
Kasu honetan, mezua 6. partiziora joan zen; hau egiaztatu dezakezu kafdrop on-era joanez localhost:9000
Gure langilearekin terminaleko leihora joanez, loguru erabiliz bidalitako mezu zoriontsu bat ikusiko dugu:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Mongo-n ere aztertu dezakegu (Robo3T edo Studio3T erabiliz) eta baloreak datu-basean daudela ikus dezakegu:
Ez naiz milioidun bat, eta horregatik konformatzen gara lehen ikusteko aukerarekin.
Zoriona eta poza - lehen agentea prest dago :)
Agente prest, bizi agente berriak!
Bai, jaunak, artikulu honek prestatutako bidearen 1/3 besterik ez dugu egin, baina ez zaitez desanimatu, orain errazagoa izango delako.
Beraz, orain meta informazioa bildu eta bilduma dokumentu batean jartzen duen agente bat behar dugu:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Agente honek segurtasun zehatz bati buruzko informazioa prozesatuko duenez, segurtasun horren ticker (ikurra) adierazi behar dugu mezuan. Horretarako faust-en daude Records — Agentearen gaian mezu-eskema deklaratzen duten klaseak.
Kasu honetan, goazen erregistroak.pyeta deskribatu nolakoa izan behar duen gai honen mezuak:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Asmatuko zenuten bezala, faust-ek python motako oharpena erabiltzen du mezu-eskema deskribatzeko, horregatik liburutegiak onartzen duen gutxieneko bertsioa da. 3.6.
Ikus dezakezunez, eskema batekin parametro berri bat pasatzen diogu gaiaren hasierako metodoari - value_type. Gainera, denak eskema bera jarraitzen du, beraz, ez diot ezertarako balio beste ezertan gelditzeari.
Beno, azken ukitua meta informazioa biltzeko agenteari dei bat gehitzea da collect_securitites:
Aurretik iragarritako eskema erabiltzen dugu mezurako. Kasu honetan, .cast metodoa erabili dut, ez baitugu agentearen emaitzaren zain egon behar, baina aipatzekoa da. moduak bidali mezu bat gaiari:
cast - ez du blokeatzen emaitzarik espero ez duelako. Ezin duzu emaitza beste gai batera bidali mezu gisa.
bidali - ez du blokeatzen emaitzarik espero ez duelako. Emaitza nora joango den gaian eragile bat zehaztu dezakezu.
galdetu - emaitza baten zain dago. Emaitza nora joango den gaian eragile bat zehaztu dezakezu.
Beraz, hori dena gaurko eragileekin!
Dream Team
Zati honetan idazteko agindu dudan azken gauza komandoak dira. Lehen aipatu bezala, faust-eko komandoak klikaren inguruko bilgarri bat dira. Izan ere, faust-ek gure komando pertsonalizatua bere interfazeari eransten dio -A gakoa zehaztean
Iragarritako agenteen ostean agenteak.py gehitu funtzio bat dekoratzaile batekin aplikazioa.agindumetodoa deituz bota у bildu_tituluak:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Horrela, komandoen zerrendari deitzen badiogu, gure komando berria bertan egongo da:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Beste edonork bezala erabil dezakegu, beraz, berrabiarazi dezagun faust langilea eta has gaitezen balore bilduma osoa:
> faust -A horton.agents start-collect-securities
Zer gertatuko da gero?
Hurrengo zatian, gainerako agenteak adibide gisa hartuta, urteko merkataritzaren itxiera-prezioetan muturrak bilatzeko hondoratze-mekanismoa eta agenteen kron-abiaraztearen inguruan hartuko dugu kontuan.
Hori da dena gaurko! Eskerrik asko irakurtzeagatik :)