Dakle, tako, drugi dio. Kao što je ranije napisano, u njemu ćemo uraditi sljedeće:
Hajde da napišemo mali klijent za alphavantage na aiohttp sa zahtevima za krajnje tačke koje su nam potrebne.
Kreirajmo agenta koji će prikupljati podatke o hartijama od vrijednosti i meta informacije o njima.
Ali, to je ono što ćemo uraditi za sam projekat, a u smislu istraživanja fausta, naučićemo kako pisati agente koji obrađuju stream događaje iz kafke, kao i kako pisati komande (click wrapper), u našem slučaju - za ručne push poruke na temu koju agent nadgleda.
Trening
AlphaVantage Client
Prvo, napišimo mali aiohttp klijent za zahtjeve za alphavantage.
AlphaVantage API je prilično jednostavno i lijepo dizajniran, pa sam odlučio da sve zahtjeve postavim putem metode construct_query gdje zauzvrat postoji http poziv.
Donosim sva polja snake_case radi praktičnosti.
Pa, dekoracija logger.catch za prekrasan i informativan traceback izlaz.
PS Ne zaboravite dodati alphavantage token lokalno u config.yml, ili izvesti varijablu okruženja HORTON_SERVICE_APIKEY. Dobijamo token ovdje.
CRUD klasa
Imat ćemo kolekciju vrijednosnih papira za čuvanje meta informacija o vrijednosnim papirima.
Za sada ćemo imati najjednostavniju izradu aplikacije, malo kasnije ćemo je proširiti, međutim, kako ne bismo čekali, ovdje reference u App-klasu. Savjetujem vam i da pogledate klasu postavki, jer je ona odgovorna za većinu postavki.
Glavni dio
Agent za prikupljanje i vođenje liste vrijednosnih papira
Dakle, prvo dobijamo objekat aplikacije Faust - to je prilično jednostavno. Zatim eksplicitno deklariramo temu za našeg agenta... Ovdje je vrijedno spomenuti šta je to, koji je interni parametar i kako se to može drugačije urediti.
Teme u kafki, ako želimo da znamo tačnu definiciju, bolje je pročitati isključeno. dokument, ili možete čitati compendium na Habré-u na ruskom, gde je takođe sve prilično tačno odraženo :)
Parametar interni, koji je prilično dobro opisan u faust dokumentu, omogućava nam da konfiguriramo temu direktno u kodu, naravno, to znači parametre koje su dali Faust programeri, na primjer: zadržavanje, politika zadržavanja (podrazumevano brisanje, ali možete podesiti kompaktni), broj particija po temi (particijeučiniti, na primjer, manje od globalnog značaja aplikacije Faust).
Generalno, agent može kreirati upravljanu temu s globalnim vrijednostima, međutim, ja volim da sve eksplicitno deklariram. Osim toga, neki parametri (na primjer, broj particija ili politika zadržavanja) teme u oglasu agenta ne mogu se konfigurirati.
Evo kako bi to moglo izgledati bez ručnog definiranja teme:
Pa, hajde da sada opišemo šta će naš agent uraditi :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Dakle, na početku agenta otvaramo aiohttp sesiju za zahtjeve preko našeg klijenta. Dakle, pri pokretanju worker-a, kada se naš agent pokrene, odmah će se otvoriti sesija - jedna, za cijelo vrijeme pokretanja radnika (ili nekoliko, ako promijenite parametar paralelnost od agenta sa zadanom jedinicom).
Zatim pratimo stream (postavimo poruku _, pošto nas, u ovom agentu, nije briga za sadržaj) poruka iz naše teme, ako postoje na trenutnom ofsetu, inače će naš ciklus čekati njihov dolazak. Pa, unutar naše petlje, evidentiramo prijem poruke, dobijemo listu aktivnih (get_security vraća samo aktivnu po defaultu, vidi klijentov kod) vrijednosnih papira i spremimo je u bazu podataka, provjeravamo da li postoji sigurnost s istim tickerom i razmjena u bazi podataka, ako postoji, onda će se ona (papir) jednostavno ažurirati.
Pokrenimo našu kreaciju!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Features web komponenta Fausta neću razmatrati u člancima, pa smo postavili odgovarajuću zastavu.
U našoj naredbi za pokretanje rekli smo Faustu gdje da traži objekt aplikacije i šta da radi s njim (pokrene worker) s nivoom izlaza info dnevnika. Dobijamo sljedeći izlaz:
Pogledajmo skup particija. Kao što vidimo, kreirana je tema sa imenom koje smo naznačili u kodu, podrazumevanim brojem particija (8, preuzeto iz topic_partitions - parametar objekta aplikacije), budući da nismo specificirali pojedinačnu vrijednost za našu temu (preko particija). Pokrenutom agentu u workeru je dodijeljeno svih 8 particija, budući da je jedini, ali o tome će biti detaljnije u dijelu o klasteriranju.
Pa, sada možemo otići na drugi prozor terminala i poslati praznu poruku našoj temi:
PS koristeći @ pokazujemo da šaljemo poruku na temu pod nazivom “collect_securities”.
U ovom slučaju, poruka je otišla na particiju 6 - ovo možete provjeriti tako što ćete otići na kafdrop on localhost:9000
Odlaskom na prozor terminala sa našim radnikom, vidjet ćemo sretnu poruku poslanu pomoću loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Također možemo pogledati mongo (koristeći Robo3T ili Studio3T) i vidjeti da su vrijednosni papiri u bazi podataka:
Nisam milijarder i zato smo zadovoljni prvom opcijom gledanja.
Sreća i veselje - prvi agent je spreman :)
Agent spreman, živio novi agent!
Da, gospodo, prešli smo samo 1/3 puta pripremljenog ovim člankom, ali nemojte se obeshrabriti, jer će sada biti lakše.
Dakle, sada nam je potreban agent koji prikuplja meta informacije i stavlja ih u dokument prikupljanja:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Pošto će ovaj agent obraditi informacije o određenoj sigurnosti, potrebno je da u poruci navedemo oznaku (simbol) ove sigurnosti. U tu svrhu u Faustu postoje ploče — klase koje deklariraju šemu poruka u temi agenta.
U ovom slučaju, idemo na records.pyi opišite kako bi poruka za ovu temu trebala izgledati:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kao što ste mogli pretpostaviti, faust koristi napomenu tipa python da opiše šemu poruke, zbog čega je minimalna verzija koju biblioteka podržava 3.6.
Kao što vidite, metodi inicijalizacije teme prenosimo novi parametar sa šemom - value_type. Nadalje, sve ide po istoj shemi, tako da ne vidim smisla da se zadržavam na bilo čemu drugom.
Pa, posljednji dodir je dodavanje poziva agentu za prikupljanje meta informacija collect_securittes:
Za poruku koristimo prethodno najavljenu šemu. U ovom slučaju koristio sam metodu .cast jer ne moramo čekati rezultat od agenta, ali vrijedi napomenuti da načine pošaljite poruku na temu:
cast - ne blokira jer ne očekuje rezultat. Rezultat ne možete poslati na drugu temu kao poruku.
send - ne blokira jer ne očekuje rezultat. Možete odrediti agenta u temi na koju će ići rezultat.
pitati - čeka rezultat. Možete odrediti agenta u temi na koju će ići rezultat.
Dakle, to je sve sa agentima za danas!
Tim snova
Poslednje što sam obećao da ću napisati u ovom delu su komande. Kao što je ranije spomenuto, komande u Faustu su omotač oko klika. U stvari, faust jednostavno prilaže našu prilagođenu komandu svom interfejsu kada specificira ključ -A
Nakon što su najavljeni agenti u agents.py dodajte funkciju s dekoraterom app.commandpozivanje metode cast у collect_securites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Dakle, ako pozovemo listu naredbi, naša nova komanda će biti u njoj:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Možemo ga koristiti kao i bilo ko drugi, pa hajde da ponovo pokrenemo faust worker i započnemo punu kolekciju vrijednosnih papira:
> faust -A horton.agents start-collect-securities
Šta će biti dalje?
U sljedećem dijelu, koristeći preostale agente kao primjer, razmotrićemo sink mehanizam za traženje ekstrema u cijenama zatvaranja trgovanja za godinu i cron lansiranje agenata.
PS U zadnjem dijelu su me pitali o Faustu i konfluentnoj kafki (koje karakteristike ima konfluent?). Čini se da je konfluent funkcionalniji na mnogo načina, ali činjenica je da Faust nema potpunu klijentsku podršku za konfluent – to proizilazi iz opisi ograničenja klijenata u dok.