Dus, het tweede deel. Zoals eerder geschreven, zullen we daarin het volgende doen:
Laten we een kleine client voor alphavantage op aiohttp schrijven met verzoeken voor de eindpunten die we nodig hebben.
Laten we een agent creëren die gegevens over effecten en meta-informatie daarover verzamelt.
Maar dit is wat we zullen doen voor het project zelf, en in termen van faustonderzoek zullen we leren hoe we agenten moeten schrijven die stroomgebeurtenissen van Kafka verwerken, en hoe we opdrachten moeten schrijven (klik-wrapper), in ons geval - voor handmatige pushberichten naar het onderwerp dat de agent in de gaten houdt.
Opleiding
AlphaVantage-klant
Laten we eerst een kleine aiohttp-client schrijven voor verzoeken aan alphavantage.
De AlphaVantage API is vrij eenvoudig en mooi ontworpen, dus besloot ik alle verzoeken via de methode in te dienen construct_query waar op zijn beurt een http-oproep is.
Ik breng alle velden naar snake_case voor comfort.
Welnu, de logger.catch-decoratie voor mooie en informatieve traceback-uitvoer.
PS Vergeet niet om het alphavantage-token lokaal toe te voegen aan config.yml, of de omgevingsvariabele te exporteren HORTON_SERVICE_APIKEY. Wij ontvangen een token hier.
CRUD-klasse
We zullen een effectencollectie hebben om meta-informatie over effecten op te slaan.
Voorlopig zullen we de eenvoudigste applicatiecreatie hebben, iets later zullen we deze uitbreiden, maar om u niet te laten wachten, hier referenties naar App-klasse. Ik raad je ook aan om eens naar de instellingenklasse te kijken, aangezien deze verantwoordelijk is voor de meeste instellingen.
Het grootste deel
Agent voor het verzamelen en bijhouden van een lijst met effecten
Dus eerst krijgen we het Faust-toepassingsobject - het is vrij eenvoudig. Vervolgens declareren we expliciet een onderwerp voor onze agent... Hier is het de moeite waard om te vermelden wat het is, wat de interne parameter is en hoe dit anders kan worden geregeld.
Onderwerpen in kafka, als we de exacte definitie willen weten, is het beter om te lezen uit. document, of je kunt lezen abstract op Habré in het Russisch, waar alles ook vrij nauwkeurig wordt weergegeven :)
Parameter intern, vrij goed beschreven in het faust-document, stelt ons in staat om het onderwerp rechtstreeks in de code te configureren, dit betekent natuurlijk de parameters die door de faust-ontwikkelaars zijn verstrekt, bijvoorbeeld: retentie, retentiebeleid (standaard verwijderd, maar je kunt compact), aantal partities per onderwerp (scoresom bijvoorbeeld minder te doen dan mondiale betekenis toepassingen faus).
Over het algemeen kan de agent een beheerd onderwerp met globale waarden maken, maar ik wil alles graag expliciet declareren. Bovendien kunnen sommige parameters (bijvoorbeeld het aantal partities of het bewaarbeleid) van het onderwerp in de agentadvertentie niet worden geconfigureerd.
Zo zou het eruit kunnen zien zonder het onderwerp handmatig te definiëren:
Laten we nu beschrijven wat onze agent zal doen :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Dus aan het begin van de agent openen we een aiohttp-sessie voor verzoeken via onze klant. Wanneer u dus een werker start en onze agent wordt gestart, wordt er onmiddellijk een sessie geopend: één, voor de hele tijd dat de werker actief is (of meerdere, als u de parameter wijzigt samenloop van een agent met een standaardeenheid).
Vervolgens volgen we de stream (we plaatsen het bericht in _, aangezien wij, in deze agent, niet om de inhoud geven) van berichten van ons onderwerp, als ze op de huidige offset bestaan, anders zal onze cyclus wachten op hun aankomst. Welnu, binnen onze lus registreren we de ontvangst van het bericht, krijgen een lijst met actieve effecten (get_securities retourneert standaard alleen actief, zie klantcode) en slaan deze op in de database, waarbij we controleren of er een effect is met dezelfde ticker en uitwisseling in de database, als die er is, dan wordt deze (het papier) eenvoudigweg bijgewerkt.
Laten we onze creatie lanceren!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS-functies webcomponent Ik zal Faust niet in de artikelen beschouwen, dus hebben we de juiste vlag geplaatst.
In ons startcommando vertelden we Faust waar we naar het applicatieobject moesten zoeken en wat we ermee moesten doen (een werker starten) met het infolog-uitvoerniveau. We krijgen de volgende uitvoer:
Laten we eens kijken naar de partitieset. Zoals we kunnen zien, is er een onderwerp gemaakt met de naam die we in de code hebben aangegeven, het standaardaantal partities (8, overgenomen van onderwerp_partities - applicatieobjectparameter), omdat we geen individuele waarde voor ons onderwerp hebben opgegeven (via partities). De gelanceerde agent in de worker krijgt alle 8 partities toegewezen, omdat dit de enige is, maar dit zal in meer detail worden besproken in het deel over clustering.
Welnu, nu kunnen we naar een ander terminalvenster gaan en een leeg bericht naar ons onderwerp sturen:
PS gebruikt @ we laten zien dat we een bericht sturen naar een onderwerp met de naam “collect_securities”.
In dit geval ging het bericht naar partitie 6. U kunt dit controleren door naar kafdrop on te gaan localhost:9000
Als we met onze medewerker naar het terminalvenster gaan, zien we een vrolijk bericht verzonden met loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
We kunnen ook naar mongo kijken (met behulp van Robo3T of Studio3T) en zien dat de effecten in de database staan:
Ik ben geen miljardair en daarom zijn we tevreden met de eerste kijkmogelijkheid.
Geluk en vreugde - de eerste agent is klaar :)
Agent klaar, lang leve de nieuwe agent!
Ja heren, we hebben slechts 1/3 van het pad afgelegd dat in dit artikel is voorbereid, maar wees niet ontmoedigd, want nu zal het gemakkelijker zijn.
Dus nu hebben we een agent nodig die meta-informatie verzamelt en deze in een verzameldocument plaatst:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Omdat deze agent informatie over een specifiek effect verwerkt, moeten we de ticker (symbool) van dit effect in het bericht vermelden. Voor dit doel zijn er in Faust Archief — klassen die het berichtenschema in het agentonderwerp declareren.
In dit geval gaan we naar records.pyen beschrijf hoe de boodschap voor dit onderwerp eruit zou moeten zien:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Zoals je misschien al geraden hebt, gebruikt Faust de annotatie van het Python-type om het berichtenschema te beschrijven. Daarom is de minimale versie die door de bibliotheek wordt ondersteund 3.6.
Laten we terugkeren naar de agent, de typen instellen en deze toevoegen:
Zoals u kunt zien, geven we een nieuwe parameter met een schema door aan de onderwerpinitialisatiemethode: waarde_type. Verder volgt alles hetzelfde schema, dus ik zie geen enkel nut om bij iets anders stil te staan.
Welnu, de laatste hand is het toevoegen van een oproep aan de meta-informatieverzamelingsagent om collect_securitites:
Voor het bericht gebruiken we het eerder aangekondigde schema. In dit geval heb ik de .cast-methode gebruikt, omdat we niet hoeven te wachten op het resultaat van de agent, maar het is de moeite waard om dat te vermelden manieren stuur een bericht naar het onderwerp:
cast - blokkeert niet omdat er geen resultaat wordt verwacht. Je kunt het resultaat niet als bericht naar een ander onderwerp sturen.
verzenden - blokkeert niet omdat er geen resultaat wordt verwacht. U kunt een agent opgeven in het onderwerp waar het resultaat naartoe gaat.
vragen - wacht op een resultaat. U kunt een agent opgeven in het onderwerp waar het resultaat naartoe gaat.
Dit was dus alles met agenten voor vandaag!
Het droomteam
Het laatste dat ik beloofde in dit deel te schrijven zijn opdrachten. Zoals eerder vermeld, zijn opdrachten in Faust een omhulsel rond klikken. In feite koppelt Faust eenvoudigweg ons aangepaste commando aan zijn interface bij het specificeren van de -A-sleutel
Nadat de aangekondigde agenten binnen waren agenten.py voeg een functie toe met een decorateur app.opdrachtde methode aanroepen gegoten у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Dus als we de lijst met opdrachten oproepen, zal onze nieuwe opdracht erin staan:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
We kunnen het net als ieder ander gebruiken, dus laten we de faust-werker opnieuw opstarten en beginnen met een volwaardige verzameling van effecten:
> faust -A horton.agents start-collect-securities
Wat gebeurt er daarna?
In het volgende deel zullen we, met de overige agenten als voorbeeld, het sink-mechanisme beschouwen voor het zoeken naar extremen in de slotkoersen van de handel voor het jaar en de cron-lancering van agenten.
Dat is alles voor vandaag! Bedankt voor het lezen :)