Allora, cusì, a seconda parte. Comu scrittu prima, in questu faremu i seguenti:
Scrivemu un picculu cliente per alphaavantage in aiohttp cù richieste per l'endpoints chì avemu bisognu.
Creemu un agentu chì cullighjarà dati nantu à i tituli è meta infurmazione nantu à elli.
Ma, questu hè ciò chì faremu per u prughjettu stessu, è in quantu à a ricerca di faust, avemu da amparà à scrive l'agenti chì processanu l'avvenimenti di flussu da kafka, è ancu cumu scrive cumandamenti (cliccate wrapper), in u nostru casu - per i missaghji push manuali à u tema chì l'agente hè monitoratu.
A preparazione di
Client AlphaVantage
Prima, scrivemu un picculu cliente aiohttp per richieste à alphaavantage.
L'API AlphaVantage hè abbastanza simplice è bellu cuncepitu, cusì aghju decisu di fà tutte e dumande attraversu u metudu construct_query induve à u turnu ci hè una chjama http.
Aghju purtatu tutti i campi à snake_case per cunfortu.
Ebbè, a decorazione di logger.catch per una bella è informativa traceback output.
PS Ùn vi scurdate di aghjunghje u token alphaavantage in u locu à config.yml, o esportà a variabile di l'ambiente HORTON_SERVICE_APIKEY. Ricevemu un token ccà.
Classe CRUD
Averemu una cullizzioni di securities per almacenà meta infurmazione nantu à i securities.
Per avà averemu a creazione di l'applicazioni più simplice, un pocu dopu l'ampliemu, però, per ùn fà micca aspittà, quì riferimenti à App-class. Vi cunsigliu ancu di piglià un ochju à a classa di paràmetri, postu chì hè rispunsevule per a maiò parte di i paràmetri.
A parte principale
Agente per a cullizzioni è u mantenimentu di una lista di securities
Allora, prima avemu l'ughjettu di l'applicazione faust - hè abbastanza simplice. In seguitu, dichjarà esplicitamente un tema per u nostru agentu ... Eccu vale a pena di dì ciò chì hè, quale hè u paràmetru internu è cumu si pò esse disposti in modu diversu.
Temi in kafka, se vulemu sapè a definizione esatta, hè megliu di leghje off. document, o pudete leghje cumpendiu nantu à Habré in Russu, induve tuttu hè ancu riflessu abbastanza precisamente :)
Parametru internu, Descritta abbastanza bè in u faust doc, ci permette di cunfigurà u tema direttamente in u codice, sicuru, questu significa i paràmetri furniti da i sviluppatori di faust, per esempiu: retenzioni, pulitica di retenzioni (per default eliminà, ma pudete stabilisce fundute), numeru di partizioni per tema (partitionsper fà, per esempiu, menu di significatu glubale applicazioni faust).
In generale, l'agente pò creà un tema gestionatu cù i valori glubale, in ogni modu, mi piace à dichjarà tuttu esplicitamente. Inoltre, certi paràmetri (per esempiu, u numeru di partizioni o pulitica di retenzioni) di u tema in l'annunziu di l'agente ùn ponu esse cunfigurati.
Eccu ciò chì puderia vede senza definisce manualmente u tema:
Ebbè, avà descrivimu ciò chì u nostru agente farà :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Allora, à u principiu di l'agente, apremu una sessione aiohttp per e dumande attraversu u nostru cliente. Cusì, quandu principia un travagliadore, quandu u nostru agente hè lanciatu, una sessione serà immediatamente aperta - unu, per tuttu u tempu chì u travagliadore hè in esecuzione (o parechji, se cambiate u paràmetru). cuncurrenza da un agente cù una unità predeterminata).
Dopu, seguitamu u flussu (pusemu u messagiu in _, Siccomu noi, in questu agentu, ùn importa micca u cuntenutu) di i missaghji da u nostru tema, si esistinu à l'offset attuale, altrimenti u nostru ciculu aspittà per a so ghjunta. Ebbè, in u nostru ciclu, registremu a ricezione di u missaghju, uttene una lista di tituli attivi (get_securities ritorna solu attivu per automaticamente, vede u codice di u cliente) è salvemu in a basa di dati, verificate s'ellu ci hè una sicurità cù u listessu ticker è scambià in a basa di dati, se ci hè, allora (u carta) serà solu aghjurnatu.
Lanciamu a nostra creazione !
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Caratteristiche PS cumpunente web Ùn cunsiderà micca faust in l'articuli, cusì avemu stabilitu a bandiera approprita.
In u nostru cumandamentu di lanciamentu, avemu dettu à faust induve circà l'ughjettu di l'applicazione è ciò chì deve fà cun ellu (lanciare un travagliadore) cù u livellu di output di u logu d'infurmazioni. Avemu a seguente output:
Fighjemu u settore di partizioni. Comu pudemu vede, un tema hè statu creatu cù u nome chì avemu designatu in u codice, u numeru predeterminatu di partizioni (8, pigliatu da topic_partitions - paràmetru di l'ughjettu di l'applicazione), postu chì ùn avemu micca specificatu un valore individuale per u nostru tema (via partizioni). L'agente lanciatu in u travagliu hè attribuitu tutte e 8 partizioni, postu chì hè l'unicu, ma questu serà discutitu in più detail in a parte di clustering.
Ebbè, avà pudemu andà in una altra finestra di terminal è mandà un missaghju viotu à u nostru tema:
PS usendu @ mostramu chì avemu da mandà un missaghju à un tema chjamatu "collect_securities".
In questu casu, u missaghju andò à a partizione 6 - pudete verificà questu andendu in kafdrop on localhost:9000
Andendu à a finestra di u terminal cù u nostru travagliadore, vedemu un missaghju felice mandatu cù loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Pudemu ancu guardà in mongo (usendu Robo3T o Studio3T) è vede chì i tituli sò in a basa di dati:
Ùn sò micca un miliardariu, è per quessa simu cuntentati cù a prima opzione di visualizazione.
Felicità è gioia - u primu agentu hè prestu :)
Agente prontu, viva u novu agente!
Iè, signori, avemu cupartu solu 1/3 di a strada preparata da questu articulu, ma ùn vi scuraggiate, perchè avà serà più faciule.
Allora avà avemu bisognu di un agente chì recullà meta infurmazione è a mette in un documentu di cullizzioni:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Siccomu questu agentu prucederà l'infurmazioni nantu à una sicurità specifica, avemu bisognu di indicà u ticker (simbulu) di sta sicurità in u messagiu. Per questu scopu in faust ci sò vinile - classi chì dichjaranu u schema di messagiu in u tema di l'agente.
In questu casu, andemu à records.pyè descrive ciò chì u missaghju per questu tema deve esse cum'è:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Cum'è avete capitu, faust usa l'annotazione di u tipu python per descriverà u schema di u messagiu, per quessa chì a versione minima supportata da a biblioteca hè 3.6.
Riturnemu à l'agente, stabilisce i tipi è aghjunghje:
Comu pudete vede, passemu un novu paràmetru cù un schema à u metudu di inizializazione di tema - value_type. In più, tuttu seguita u listessu schema, cusì ùn vecu micca u puntu di aspittà nantu à qualcosa altru.
Ebbè, u toccu finali hè di aghjunghje una chjama à l'agente di cullizzioni di meta infurmazione per collect_securitites:
Utilizemu u schema annunziatu prima per u messagiu. In questu casu, aghju utilizatu u metudu .cast postu chì ùn avemu micca bisognu di aspittà u risultatu da l'agente, ma vale a pena dì chì modi mandate un missaghju à u tema:
cast - ùn blucca micca perchè ùn aspetta micca un risultatu. Ùn pudete micca mandà u risultatu à un altru tema cum'è missaghju.
mandate - ùn blucca micca perchè ùn aspetta micca un risultatu. Pudete specificà un agentu in u tema à quale u risultatu andarà.
dumandà - aspetta un risultatu. Pudete specificà un agente in u tema à quale u risultatu andarà.
Allora, questu hè tuttu cù l'agenti per oghje!
A squadra di sognu
L'ultima cosa chì aghju prumessu di scrive in questa parte hè cumandamenti. Comu diciatu prima, i cumandamenti in faust sò un wrapper intornu à cliccà. In fatti, faust attache semplicemente u nostru cumandamentu persunalizatu à a so interfaccia quandu specifica a chjave -A
Dopu à l'agenti annunciati in agenti.py aghjunghje una funzione cù un decoratore app.commandchjamà u metudu u cast у collect_securities:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Cusì, se chjamemu a lista di cumandamenti, u nostru novu cumandamentu serà in questu:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Pudemu usà cum'è qualcunu altru, allora riavvia u faust worker è cuminciamu una cullizzioni cumpleta di securities:
> faust -A horton.agents start-collect-securities
Chì succede dopu ?
In a parti dopu, utilizendu l'agenti rimanenti cum'è un esempiu, cunsideremu u mecanismu di sink per a ricerca di l'estremi in i prezzi di chjusi di cummerciale per l'annu è u cron launch of agents.