Taigi, antra dalis. Kaip parašyta anksčiau, jame atliksime šiuos veiksmus:
Parašykime nedidelį alfavantage klientą aiohttp su užklausomis dėl mums reikalingų galinių taškų.
Sukurkime agentą, kuris rinks duomenis apie vertybinius popierius ir meta informaciją apie juos.
Bet tai mes padarysime pačiam projektui, o kalbant apie „Fausto“ tyrimą, išmoksime rašyti agentus, kurie apdoroja srautinius įvykius iš kafka, taip pat kaip rašyti komandas (spustelėkite įpakavimą), mūsų atveju - rankiniu būdu išsiųstiems pranešimams į temą, kurią agentas stebi.
Mokymai
„AlphaVantage“ klientas
Pirmiausia parašykime nedidelį aiohttp klientą, skirtą alfavantage užklausoms.
„AlphaVantage“ API yra gana paprasta ir gražiai sukurta, todėl nusprendžiau visus prašymus pateikti naudojant metodą construct_query kur savo ruožtu yra http skambutis.
Atvežu visus laukus snake_case patogumui.
Na, logger.catch puošmena gražiam ir informatyviam atsekimui.
PS Nepamirškite į config.yml pridėti alfavantage prieigos rakto arba eksportuoti aplinkos kintamąjį HORTON_SERVICE_APIKEY. Mes gauname žetoną čia.
CRUD klasė
Turėsime vertybinių popierių kolekciją, kurioje saugosime metainformaciją apie vertybinius popierius.
Kol kas turėsime paprasčiausią aplikacijų kūrimą, kiek vėliau ją išplėsime, tačiau kad nereikėtų laukti, čia nuorodos į App-klasę. Taip pat patariu pažvelgti į nustatymų klasę, nes ji atsakinga už daugumą nustatymų.
Pagrindinė dalis
Vertybinių popierių sąrašo rinkimo ir tvarkymo agentas
Taigi, pirmiausia gauname „faust“ programos objektą - tai gana paprasta. Toliau mes aiškiai deklaruojame temą mūsų agentui... Čia verta paminėti, kas tai yra, koks yra vidinis parametras ir kaip tai galima kitaip išdėstyti.
Temos kafka, jei norime sužinoti tikslų apibrėžimą, geriau perskaityti išjungti. dokumentas, arba galite skaityti sąvadas ant Habré rusų kalba, kur viskas taip pat gana tiksliai atspindėta :)
Vidinis parametras, gana gerai aprašyta faust dokumente, leidžia temą sukonfigūruoti tiesiai kode, žinoma, tai reiškia fausto kūrėjų pateiktus parametrus, pvz.: retention, retention policy (pagal numatytuosius nustatymus ištrinti, bet galite nustatyti kompaktiškas), skyrių skaičius vienoje temoje (daugybėpadaryti, pavyzdžiui, mažiau nei pasaulinės reikšmės programos faust).
Apskritai agentas gali sukurti valdomą temą su globaliomis vertybėmis, tačiau man patinka viską deklaruoti aiškiai. Be to, negalima konfigūruoti kai kurių agento skelbimo temos parametrų (pavyzdžiui, skaidinių skaičiaus arba saugojimo politikos).
Štai kaip tai gali atrodyti rankiniu būdu neapibrėžus temos:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Taigi agento pradžioje atidarome aiohttp seansą užklausoms per savo klientą. Taigi paleidus darbuotoją, paleidus mūsų agentą, iš karto bus atidaryta sesija – viena, visam darbuotojo veikimo laikui (arba kelioms, jei pakeisite parametrą sutapimas iš agento su numatytuoju vienetu).
Toliau sekame srautą (įdedame pranešimą _, nes mums, šiam agentui, nerūpi pranešimų iš mūsų temos turinys, jei jie egzistuoja dabartiniame poslinkyje, kitaip mūsų ciklas lauks jų atvykimo. Na, savo ciklo viduje registruojame pranešimo gavimą, gauname aktyvių (get_securities grąžina tik aktyvius pagal nutylėjimą, žr. kliento kodą) vertybinių popierių sąrašą ir išsaugome jį duomenų bazėje, patikriname, ar yra vertybinių popierių su tuo pačiu žymekliu ir mainai duomenų bazėje , jei yra, tada jis (popierius) bus tiesiog atnaujintas.
Pradėkime savo kūrybą!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS funkcijos žiniatinklio komponentas Fausto straipsniuose nesvarstysiu, todėl iškeliame atitinkamą vėliavėlę.
Savo paleidimo komandoje mes nurodėme faust, kur ieškoti programos objekto ir ką su juo daryti (paleisti darbuotoją) su informacijos žurnalo išvesties lygiu. Gauname tokią išvestį:
Pažiūrėkime į skaidinių rinkinį. Kaip matome, buvo sukurta tema su pavadinimu, kurį nurodėme kode, numatytuoju skaidinių skaičiumi (8, paimti iš temos_skyriai - programos objekto parametras), nes nenurodėme individualios mūsų temos reikšmės (per skaidinius). Darbuotoje paleistam agentui priskiriami visi 8 skirsniai, nes jis yra vienintelis, tačiau tai bus išsamiau aptarta dalyje apie klasterizavimą.
Na, dabar galime pereiti į kitą terminalo langą ir išsiųsti tuščią žinutę į mūsų temą:
PS naudojimas @ parodome, kad siunčiame pranešimą tema, pavadinta „surinkti_vertybinius popierius“.
Šiuo atveju pranešimas nukeliavo į 6 skaidinį – tai galite patikrinti apsilankę kafdrop on localhost:9000
Eidami į terminalo langą su savo darbuotoju, pamatysime laimingą pranešimą, išsiųstą naudojant loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Taip pat galime pažvelgti į mongo (naudojant Robo3T arba Studio3T) ir pamatyti, kad vertybiniai popieriai yra duomenų bazėje:
Nesu milijardierius, todėl esame patenkinti pirmuoju žiūrėjimo variantu.
Laimė ir džiaugsmas - pirmasis agentas pasiruošęs :)
Agentas pasiruošęs, tegyvuoja naujasis agentas!
Taip, ponai, įveikėme tik 1/3 šio straipsnio paruošto kelio, bet nenusiminkite, nes dabar bus lengviau.
Taigi dabar mums reikia agento, kuris renka meta informaciją ir įkelia ją į rinkimo dokumentą:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Kadangi šis agentas apdoros informaciją apie konkretų vertybinį popierių, pranešime turime nurodyti šio saugumo žymeklį (simbolį). Tam tikslui fauste yra Įrašai — klasės, deklaruojančios pranešimų schemą agento temoje.
Šiuo atveju pereikime prie records.pyir aprašykite, kaip turėtų atrodyti šios temos pranešimas:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kaip jau galėjote atspėti, „faust“ naudoja „python“ tipo anotaciją pranešimo schemai apibūdinti, todėl minimali bibliotekos palaikoma versija yra 3.6.
Grįžkime prie agento, nustatykime tipus ir pridėkime:
Kaip matote, temos inicijavimo metodui perduodame naują parametrą su schema - vertės_tipas. Be to, viskas vyksta pagal tą pačią schemą, todėl nematau prasmės leistis į ką nors kita.
Na, paskutinis prisilietimas yra pridėti iškvietimą metainformacijos rinkimo agentui į collection_securitites:
Pranešimui naudojame anksčiau paskelbtą schemą. Šiuo atveju naudojau .cast metodą, nes nereikia laukti rezultato iš agento, tačiau verta paminėti, kad būdais siųsti žinutę į temą:
cast - neblokuoja, nes nesitiki rezultato. Negalite siųsti rezultato į kitą temą kaip žinutės.
siųsti – neblokuoja, nes nesitiki rezultato. Temoje galite nurodyti agentą, į kurį bus nukreiptas rezultatas.
paklausti - laukia rezultato. Temoje galite nurodyti agentą, į kurį bus nukreiptas rezultatas.
Taigi, su agentais šiandien viskas!
Svajonių komanda
Paskutinis dalykas, kurį pažadėjau parašyti šioje dalyje, yra komandos. Kaip minėta anksčiau, „faust“ komandos yra paspaudimas. Tiesą sakant, Faustas tiesiog prideda mūsų pasirinktinę komandą prie savo sąsajos, kai nurodo klavišą -A
Paskelbtiems agentams įėjus agentai.py pridėti funkciją su dekoratoriumi app.commandvadindamas metodą mesti у rinkti_vertybinius popierius:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Taigi, jei iškviesime komandų sąrašą, jame bus mūsų nauja komanda:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Galime jį naudoti kaip bet kas kitas, todėl iš naujo paleiskite „faust worker“ ir pradėkime visavertį vertybinių popierių rinkimą:
> faust -A horton.agents start-collect-securities
Kas bus toliau?
Kitoje dalyje, kaip pavyzdį naudodamiesi likusiais agentais, apžvelgsime nugrimzdimo mechanizmą ieškant kraštutinumų prekybos metų uždarymo kainose ir agentų kronų paleidime.
PS Paskutinėje dalyje manęs paklausė apie faust and confluent kafka (kokias savybes turi confluentas?). Atrodo, kad „confluent“ daugeliu atžvilgių yra funkcionalesnis, tačiau faktas yra tas, kad „Faust“ neturi visiško „confluento“ klientų palaikymo – tai išplaukia iš kliento apribojimų aprašymai doc.