Joten, niin, toinen osa. Kuten aiemmin kirjoitettiin, teemme siinä seuraavat:
Kirjoitetaan pieni asiakas alfavantagelle aiohttp:lle, jossa pyydetään tarvitsemiamme päätepisteitä.
Luodaan agentti, joka kerää tietoja arvopapereista ja metatietoja niistä.
Mutta tämän me teemme itse projektille, ja faust-tutkimuksen kannalta opimme kirjoittamaan agentteja, jotka käsittelevät kafkan suoratoistotapahtumia, sekä kuinka kirjoittaa komentoja (klikkaa käärettä), meidän tapauksessamme - manuaalisille push-viesteille agentin valvomaan aiheeseen.
Koulutus
AlphaVantage-asiakas
Ensin kirjoitetaan pieni aiohttp-asiakas alfavantage-pyyntöjä varten.
AlphaVantage API on melko yksinkertainen ja kauniisti suunniteltu, joten päätin tehdä kaikki pyynnöt menetelmän kautta construct_query jossa vuorostaan on http-kutsu.
Tuon kaikki kentät snake_case mukavuuden vuoksi.
Logger.catch-koristelu kauniiseen ja informatiiviseen jäljitystulokseen.
PS Älä unohda lisätä alphavantage-tunnusta paikallisesti config.yml-tiedostoon tai viedä ympäristömuuttujaa HORTON_SERVICE_APIKEY. Saamme merkin täällä.
CRUD-luokka
Meillä on arvopaperikokoelma tallentaaksemme metatietoja arvopapereista.
Toistaiseksi meillä on yksinkertaisin sovellusten luominen, hieman myöhemmin laajennamme sitä, mutta jotta et joutuisi odottamaan, tässä viittauksia App-luokkaan. Suosittelen myös katsomaan asetusluokkaa, koska se vastaa suurimmasta osasta asetuksista.
Pääosa
Agentti arvopaperiluettelon keräämiseen ja ylläpitämiseen
Joten ensin saamme faust-sovellusobjektin - se on melko yksinkertaista. Seuraavaksi ilmoitamme nimenomaisesti agentillemme aiheen... Tässä kannattaa mainita, mikä se on, mikä on sisäinen parametri ja miten tämä voidaan järjestää toisin.
Kafkan aiheet, jos haluamme tietää tarkan määritelmän, on parempi lukea vinossa. asiakirja, tai voit lukea yhteenveto venäjänkielisellä Habrella, jossa kaikki heijastuu myös melko tarkasti :)
Sisäinen parametri, joka on kuvattu melko hyvin faust-dokumentissa, mahdollistaa aiheen määrittämisen suoraan koodissa, tämä tarkoittaa tietysti faust-kehittäjien antamia parametreja, esimerkiksi: säilyttäminen, säilytyskäytäntö (oletuksena poistaa, mutta voit asettaa kompakti), osioiden lukumäärä aihetta kohti (tuloksettehdä esimerkiksi vähemmän kuin maailmanlaajuista merkitystä sovellukset faust).
Yleensä agentti voi luoda hallitun aiheen globaaleilla arvoilla, mutta haluan ilmoittaa kaiken eksplisiittisesti. Lisäksi joitain agenttimainoksen aiheen parametreja (esimerkiksi osioiden lukumäärää tai säilytyskäytäntöä) ei voi määrittää.
Tältä se saattaa näyttää ilman aiheen manuaalista määrittelyä:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Joten agentin alussa avaamme aiohttp-istunnon asiakkaamme kautta oleville pyynnöille. Siten työntekijää käynnistettäessä, kun agenttimme käynnistetään, istunto avautuu välittömästi - yksi, koko työntekijän ollessa käynnissä (tai useita, jos muutat parametria samanaikaisuuden agentilta, jolla on oletusyksikkö).
Seuraavaksi seuraamme streamia (sijoitamme viestin _, koska me tässä agentissa emme välitä aiheemme viestien sisällöstä, jos ne ovat olemassa nykyisellä siirrolla, muuten syklimme odottaa niiden saapumista. No, silmukassamme kirjaamme viestin vastaanoton, hankimme luettelon aktiivisista (get_securities palauttaa oletusarvoisesti vain aktiivisia, katso asiakaskoodi) arvopapereista ja tallennamme sen tietokantaan tarkistaen, onko arvopapereita samalla tickerillä ja vaihto tietokannassa, jos sellainen on, se (paperi) yksinkertaisesti päivitetään.
Aloitetaan luomuksemme!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS-ominaisuudet verkkokomponentti En ota Faustia artikkeleissa huomioon, joten asetamme oikean lipun.
Käynnistyskomennossamme kerroimme faustille, mistä sovellusobjektia etsitään ja mitä sillä tehdään (käynnistetään worker) infolokin lähtötasolla. Saamme seuraavan tuloksen:
Katsotaanpa osiojoukkoa. Kuten näemme, aihe luotiin nimellä, jonka määritimme koodissa, oletusarvoisella osioiden määrällä (8, otettu topic_partitions - sovellusobjektiparametri), koska emme määrittäneet aiheellemme yksittäistä arvoa (osioiden kautta). Työntekijässä käynnistetylle agentille on osoitettu kaikki 8 osiota, koska se on ainoa, mutta tätä käsitellään tarkemmin klusterointia käsittelevässä osassa.
No, nyt voimme mennä toiseen pääteikkunaan ja lähettää tyhjän viestin aiheeseemme:
PS käytössä @ näytämme, että lähetämme viestin aiheeseen nimeltä "kerää_arvopaperit".
Tässä tapauksessa viesti meni osioon 6 - voit tarkistaa tämän menemällä osoitteeseen kafdrop on localhost:9000
Menemme työntekijämme kanssa pääteikkunaan, näemme iloisen viestin, joka lähetetään logurulla:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Voimme myös tarkastella mongoa (Robo3T:llä tai Studio3T:llä) ja nähdä, että arvopaperit ovat tietokannassa:
En ole miljardööri, ja siksi olemme tyytyväisiä ensimmäiseen katseluvaihtoehtoon.
Onnea ja iloa - ensimmäinen agentti on valmis :)
Agentti valmis, eläköön uusi agentti!
Kyllä, herrat, olemme käyneet läpi vain 1/3 tämän artikkelin laatimasta polusta, mutta älkää lannistuko, sillä nyt se on helpompaa.
Joten nyt tarvitsemme agentin, joka kerää metatietoja ja laittaa ne keräysasiakirjaan:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Koska tämä agentti käsittelee tietoja tietystä arvopaperista, meidän on ilmoitettava tämän suojauksen merkki (symboli) viestissä. Tätä tarkoitusta varten faustissa on Asiakirjat — luokat, jotka ilmoittavat viestimallin agenttiaiheessa.
Tässä tapauksessa mennään records.pyja kuvaile, miltä tämän aiheen viestin pitäisi näyttää:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kuten arvata saattaa, faust käyttää python-tyyppistä huomautusta viestiskeeman kuvaamiseen, minkä vuoksi kirjaston tukema vähimmäisversio on 3.6.
Palataan agenttiin, asetetaan tyypit ja lisätään se:
Kuten näet, välitämme aiheen alustusmenetelmälle uuden parametrin kaavan kanssa - arvo_tyyppi. Lisäksi kaikki noudattaa samaa kaavaa, joten en näe mitään järkeä keskittyä mihinkään muuhun.
No, viimeinen silaus on lisätä kutsu metatietojen keräämisagentille collection_securitites:
Käytämme viestissä aiemmin ilmoitettua mallia. Tässä tapauksessa käytin .cast-menetelmää, koska meidän ei tarvitse odottaa agentin tulosta, mutta on syytä mainita, että tapoja lähetä viesti aiheeseen:
heittää - ei estä, koska se ei odota tulosta. Tulosta ei voi lähettää viestinä toiseen aiheeseen.
lähetä - ei estä, koska se ei odota tulosta. Voit määrittää aiheessa agentin, jolle tulos siirtyy.
kysyä - odottaa tulosta. Voit määrittää aiheessa agentin, jolle tulos siirtyy.
Joten, tässä kaikki agenttien kanssa tälle päivälle!
Unelmajoukkue
Viimeinen asia, jonka lupasin kirjoittaa tähän osaan, ovat komennot. Kuten aiemmin mainittiin, faustin komennot ovat kääre napsautuksen ympärillä. Itse asiassa Faust yksinkertaisesti liittää mukautetun komentomme käyttöliittymäänsä määrittäessään -A-näppäintä
Ilmoitettujen agenttien jälkeen agents.py lisää toiminto sisustajalla app.commandkutsumalla menetelmää heittää у kerää_arvopapereita:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Joten jos kutsumme komentoluetteloa, uusi komentomme on siinä:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Voimme käyttää sitä kuten kuka tahansa, joten käynnistetään faust worker uudelleen ja aloitetaan täysi arvopaperien kerääminen:
> faust -A horton.agents start-collect-securities
Mitä seuraavaksi tapahtuu?
Seuraavassa osassa tarkastellaan jäljelle jääneitä agentteja esimerkkinä nielumekanismia äärimmäisyyksien etsimiseen vuoden kaupankäynnin päätöskursseista ja agenttien kruunujen lanseerauksesta.