Also, also, den zweeten Deel. Wéi virdru geschriwwen, wäerte mir déi folgend maachen:
Loosst eis e klenge Client fir Alphavantage op aiohttp schreiwen mat Ufroe fir d'Endpunkte déi mir brauchen.
Loosst eis en Agent erstellen deen Daten iwwer Wäertpabeieren a Meta-Informatioune sammelt.
Awer, dat ass wat mir fir de Projet selwer maachen, a wat d'Faustfuerschung ugeet, léiere mir wéi Agenten schreiwen déi Stream-Evenementer vu Kafka veraarbechten, wéi och Kommandoen (Klick-Wrapper) schreiwen, an eisem Fall - fir manuell Push Messagen zum Thema dat den Agent iwwerwaacht.
Virbereedung
AlphaVantage Client
Als éischt schreiwen mer e klenge aiohttp Client fir Ufroe fir Alphavantage.
D'AlphaVantage API ass ganz einfach a schéin entworf, also hunn ech beschloss all Ufroen duerch d'Method ze maachen construct_query wou am Tour gëtt et en http Uruff.
Ech bréngen all Felder op snake_case fir Komfort.
Gutt, de Logger.catch Dekoratioun fir schéin an informativ Traceback Output.
PS Vergiesst net den Alphavantage Token lokal op config.yml ze addéieren, oder d'Ëmweltvariabel exportéieren HORTON_SERVICE_APIKEY. Mir kréien en Token hei.
CRUD Klass
Mir wäerten eng Sammlung vu Wäertpabeieren hunn fir Meta-Informatioun iwwer Wäertpabeieren ze späicheren.
Fir elo wäerte mir déi einfachst Applikatioun kreéieren, e bësse méi spéit wäerte mir se awer ausbauen, fir Iech net ze waarden, hei Referenze zu App-Klass. Ech roden Iech och d'Astellungsklass ze kucken, well se fir déi meescht Astellunge verantwortlech ass.
Haaptsaach
Agent fir eng Lëscht vu Wäertpabeieren ze sammelen an z'erhalen
Also, als éischt kréien mir de Faust Applikatiounsobjekt - et ass ganz einfach. Als nächst erkläre mir explizit en Thema fir eisen Agent ... Hei ass et derwäert ze ernimmen wat et ass, wat den internen Parameter ass a wéi dëst anescht arrangéiert ka ginn.
Themen am Kafka, wa mir déi genau Definitioun wësse wëllen, ass et besser ze liesen aus. Dokument, Oder Dir kënnt liesen Kompendium op Habré op Russesch, wou och alles zimlech genee reflektéiert gëtt :)
Parameter intern, ganz gutt am faust doc beschriwwen, erlaabt eis d'Thema direkt am Code ze konfiguréieren, natierlech, dat heescht d'Parameteren, déi vun de Faust Entwéckler geliwwert ginn, zum Beispill: Retentioun, Retentiounspolitik (par défaut läschen, awer Dir kënnt astellen kompakt), Zuel vun de Partitionen pro Thema (Partiturenze maachen, zum Beispill, manner wéi global Bedeitung Applikatiounen faust).
Am Allgemengen kann den Agent e verwalteten Thema mat globale Wäerter erstellen, awer ech wëll alles explizit deklaréieren. Zousätzlech kënnen e puer Parameteren (zum Beispill d'Zuel vun de Partitionen oder d'Retentiounspolitik) vum Thema an der Agent Annonce net konfiguréiert ginn.
Hei ass wéi et ausgesäit ouni d'Thema manuell ze definéieren:
Gutt, loosst eis elo beschreiwen wat eisen Agent wäert maachen :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Also, am Ufank vum Agent, öffnen mir eng aiohttp Sessioun fir Ufroen duerch eise Client. Also, wann Dir en Aarbechter start, wann eisen Agent lancéiert gëtt, gëtt eng Sessioun direkt opgemaach - eng, fir déi ganz Zäit wou den Aarbechter leeft (oder e puer, wann Dir de Parameter ännert) gläichzäiteg vun engem Agent mat enger Standardunitéit).
Als nächst verfollege mir de Stream (mir setzen de Message an _, well mir, an dësem Agent, egal iwwer den Inhalt) vun Messagen aus eisem Thema, wa se existéieren am aktuellen Offset, soss wäert eisen Zyklus op hir Arrivée waarden. Gutt, an eiser Loop protokolléiere mir den Empfang vun der Noriicht, kréien eng Lëscht vun aktive (get_securities gëtt nëmmen aktiv als Standard zréck, kuckt Clientcode) Wäertpabeieren a späicheren se an d'Datebank, kontrolléiert ob et eng Sécherheet mat deemselwechten Ticker gëtt an Austausch an der Datebank , wann et ass, da gëtt et (de Pabeier) einfach aktualiséiert.
Loosst eis eis Kreatioun starten!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Fonctiounen Web Komponente Ech wäert de Faust net an den Artikelen berücksichtegen, also setzen mir de passende Fändel.
An eisem Startbefehl hu mir de Faust gesot, wou een no der Applikatiounsobjet sicht a wat mat deem ze maachen (en Aarbechter starten) mam Info-Logoutputniveau. Mir kréien déi folgend Ausgang:
Loosst eis d'Partitionsset kucken. Wéi mir kënne gesinn, gouf en Thema erstallt mam Numm dee mir am Code bezeechent hunn, d'Standardzuel vun de Partitionen (8, geholl aus topic_Partitionen - Applikatiounsobjektparameter), well mir keen individuellen Wäert fir eist Thema uginn hunn (iwwer Partitionen). De lancéierten Agent am Aarbechter gëtt all 8 Partitionen zougewisen, well et deen eenzegen ass, awer dëst wäert méi am Detail am Deel iwwer Clustering diskutéiert ginn.
Gutt, elo kënne mir op eng aner Terminalfenster goen an en eidele Message un eist Thema schécken:
PS benotzt @ mir weisen datt mir e Message un en Thema mam Numm "collect_securities" schécken.
An dësem Fall ass de Message op d'Partition 6 gaang - Dir kënnt dëst kontrolléieren andeems Dir op kafdrop op gitt localhost:9000
Gitt an d'Terminalfenster mat eisem Aarbechter, gesi mir e gléckleche Message geschéckt mat Loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Mir kënnen och Mongo kucken (mat Robo3T oder Studio3T) a kucken datt d'Sécherheeten an der Datebank sinn:
Ech si kee Milliardär, an dofir si mir zefridden mat der éischter Vueoptioun.
Gléck a Freed - den éischten Agent ass prett :)
Agent prett, laang liewen den neien Agent!
Jo, Hären, mir hunn nëmmen 1/3 vum Wee, deen an dësem Artikel preparéiert ass, ofgedeckt, awer net decouragéiert, well elo gëtt et méi einfach.
Also elo brauche mir en Agent deen Meta Informatioun sammelt an se an e Sammeldokument setzt:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Well dësen Agent Informatioun iwwer eng spezifesch Sécherheet veraarbecht, musse mir den Ticker (Symbol) vun dëser Sécherheet an der Noriicht uginn. Fir dësen Zweck am Faust ginn et Records - Klassen déi de Message Schema am Agent Thema erklären.
An dësem Fall, loosst eis goen records.pya beschreiwen wéi de Message fir dëst Thema soll ausgesinn:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Wéi Dir vläicht scho virgestallt hutt, benotzt de Faust d'Python-Typ Annotatioun fir de Message Schema ze beschreiwen, dofir ass d'Mindest Versioun ënnerstëtzt vun der Bibliothéik 3.6.
Loosst eis zréck op den Agent, setzen d'Typen a fügen se derbäi:
Wéi Dir gesitt, passéiere mir en neie Parameter mat engem Schema un d'Thema Initialiséierungsmethod - value_type. Weider follegt alles dem selwechte Schema, also gesinn ech kee Sënn fir op soss eppes ze bleiwen.
Gutt, de leschte Touch ass en Uruff un de Meta Informatiounssammlung Agent fir collect_securitites ze addéieren:
Mir benotzen de virdru ugekënnegt Schema fir de Message. An dësem Fall hunn ech d'.cast Method benotzt well mir net op d'Resultat vum Agent musse waarden, awer et ass derwäert ze ernimmen datt Weeër schéckt e Message zum Thema:
Besetzung - blockéiert net well et kee Resultat erwaart. Dir kënnt d'Resultat net an en anert Thema als Message schécken.
schécken - blockéiert net well et kee Resultat erwaart. Dir kënnt en Agent spezifizéieren am Thema op deen d'Resultat geet.
froen - waart op e Resultat. Dir kënnt en Agent spezifizéieren am Thema op deen d'Resultat geet.
Also, dat ass alles mat Agenten fir haut!
D'Dream Team
Déi lescht Saach, déi ech versprach hunn an dësem Deel ze schreiwen ass Kommandoen. Wéi virdru scho gesot, Kommandoen am Faust sinn e Wrapper ronderëm Klick. Tatsächlech befestegt de Faust einfach eise personaliséierte Kommando op seng Interface wann Dir den -A Schlëssel spezifizéiert
No der ugekënnegt Agenten an agents.py eng Funktioun mat engem Dekorateur addéieren app.kommandod'Method nennen gegruewen у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Also, wa mir d'Lëscht vun de Befehle nennen, da wäert eisen neie Kommando dra sinn:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Mir kënnen et benotzen wéi jiddereen aneren, also loosst eis de Faust Aarbechter nei starten an eng vollwäerteg Sammlung vu Wäertpabeieren ufänken:
> faust -A horton.agents start-collect-securities
Wat wäert dann geschéien?
Am nächsten Deel, andeems Dir déi verbleiwen Agenten als Beispill benotzt, wäerte mir de Sinkmechanismus fir d'Sich no Extremen an de Schlusspräisser vum Handel fir d'Joer an d'Cron-Start vun Agenten berücksichtegen.
PS Ënnert dem leschten Deel gouf ech iwwer Faust a confluent Kafka gefrot (wat Features huet confluent?). Et schéngt, datt confluent méi funktionell a ville Weeër ass, awer de Fakt ass datt de Faust keng voll Client Ënnerstëtzung fir Confluent huet - dëst folgt aus Beschreiwunge vu Client Restriktiounen am Dok.