Don haka, kashi na biyu. Kamar yadda aka rubuta a baya, a ciki za mu yi kamar haka:
Bari mu rubuta ƙaramin abokin ciniki don haruffa akan aiohttp tare da buƙatun ƙarshen abubuwan da muke buƙata.
Bari mu ƙirƙiri wakili wanda zai tattara bayanai akan tsaro da bayanan meta akan su.
Amma, wannan shine abin da za mu yi don aikin da kansa, kuma dangane da bincike mai zurfi, za mu koyi yadda ake rubuta wakilai waɗanda ke aiwatar da abubuwan da suka faru daga kafka, da kuma yadda ake rubuta umarni (danna wrapper), a cikin yanayinmu - don saƙonnin turawa da hannu zuwa batun da wakilin ke sa ido.
Horo
Abokin ciniki na AlphaVantage
Da farko, bari mu rubuta ƙaramin abokin ciniki aiohttp don buƙatun haruffa.
API ɗin AlphaVantage yana da sauƙi kuma an tsara shi da kyau, don haka na yanke shawarar yin duk buƙatun ta hanyar construct_query inda kuma akwai kiran http.
Ina kawo duk filayen zuwa snake_case don dacewa.
Da kyau, kayan ado na logger.catch don kyakkyawan fitarwa mai ban sha'awa.
PS Kar a manta da ƙara alamar haruffa a cikin gida zuwa config.yml, ko fitarwa canjin yanayi HORTON_SERVICE_APIKEY. Muna karɓar alama a nan.
Babban darajar CRUD
Za mu sami tarin bayanan sirri don adana bayanan meta game da tsaro.
A yanzu za mu sami mafi sauƙin ƙirƙirar aikace-aikacen, nan gaba kadan za mu fadada shi, duk da haka, don kada ku jira, a nan. nassoshi zuwa App-class. Ina kuma ba ku shawarar ku duba ajin saitin, tunda shi ne ke da alhakin yawancin saitunan.
Don haka, da farko za mu sami abin aikace-aikacen faust - abu ne mai sauƙi. Na gaba, muna bayyana wani batu a fili ga wakilinmu ... Anan yana da daraja ambaton abin da yake, menene ma'aunin ciki da kuma yadda za'a iya shirya wannan daban.
Batutuwa a cikin kafka, idan muna son sanin ainihin ma'anar, yana da kyau mu karanta kashe. daftarin aiki, ko kuma za ku iya karantawa compendium akan Habré a cikin harshen Rashanci, inda komai kuma yake nunawa daidai :)
Siga na ciki, wanda aka bayyana da kyau a cikin faust doc, yana ba mu damar daidaita batun kai tsaye a cikin lambar, ba shakka, wannan yana nufin sigogin da masu haɓaka faust suka bayar, misali: riƙewa, manufofin riƙewa (ta hanyar sharewa ta asali, amma zaku iya saitawa. m), adadin sassan kowane batu (scoresyi, misali, kasa da muhimmancin duniya aikace-aikace masu sauri).
Gabaɗaya, wakili na iya ƙirƙirar batun sarrafawa tare da ƙimar duniya, duk da haka, Ina so in bayyana komai a sarari. Bugu da ƙari, wasu sigogi (misali, adadin ɓangarori ko manufofin riƙewa) na jigo a cikin tallan wakili ba za a iya daidaita su ba.
Ga yadda zai yi kama ba tare da bayyana batun da hannu ba:
To, yanzu bari mu bayyana abin da wakilinmu zai yi :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Don haka, a farkon wakilin, muna buɗe taron aiohttp don buƙatun ta abokin cinikinmu. Don haka, lokacin fara ma'aikaci, lokacin da aka ƙaddamar da wakilinmu, za a buɗe wani zama nan da nan - ɗaya, na tsawon lokacin da ma'aikaci ke gudana (ko da yawa, idan kun canza ma'auni). daidaituwa daga wakili tare da tsoho naúrar).
Bayan haka, muna bin rafi (mun sanya saƙon a ciki _, Tun da mu, a cikin wannan wakili, ba mu damu da abubuwan da ke ciki ba) na saƙonni daga batunmu, idan sun kasance a halin yanzu, in ba haka ba sake zagayowar mu zai jira zuwan su. Da kyau, a cikin madaukinmu, muna shigar da karɓar saƙon, mu sami jerin abubuwan aiki (get_securities yana dawo da aiki ta tsohuwa, duba lambar abokin ciniki) kuma mu adana shi cikin ma'ajin bayanai, bincika idan akwai tsaro tare da tikiti iri ɗaya kuma musanya a cikin rumbun adana bayanai , idan akwai, to (takardar) za a sabunta ta kawai.
Bari mu kaddamar da mu halitta!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Abubuwan PS bangaren yanar gizo Ba zan yi la'akari da faust a cikin labaran ba, don haka mun saita tutar da ta dace.
A cikin umarnin ƙaddamar da mu, mun gaya wa faust inda za a nemo abin aikace-aikacen da abin da za a yi da shi (ƙaddamar da ma'aikaci) tare da matakin fitarwar bayanan bayanai. Muna samun fitarwa mai zuwa:
Bari mu dubi saitin bangare. Kamar yadda muke iya gani, an ƙirƙiri wani batu tare da sunan da muka zayyana a cikin lambar, tsohuwar adadin ɓangarori (8, an ɗauko daga topic_partitions - sigar abu na aikace-aikacen), tunda ba mu ƙididdige ƙimar mutum ɗaya don batunmu ba (ta hanyar ɓangarori). Wakilin da aka ƙaddamar a cikin ma'aikaci an sanya shi duka sassan 8, tun da shi kaɗai ne, amma za a tattauna wannan dalla-dalla a cikin ɓangaren game da tari.
To, yanzu za mu iya zuwa wata tagar tasha kuma mu aika sako mara kyau zuwa maudu'inmu:
PS mai amfani @ mun nuna cewa muna aika sako zuwa wani batu mai suna "collect_securities".
A wannan yanayin, saƙon ya tafi partition 6 - za ka iya duba wannan ta zuwa kafdrop on localhost:9000
Tafiya zuwa taga tasha tare da ma'aikacinmu, za mu ga sakon farin ciki da aka aika ta amfani da loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Hakanan zamu iya duba cikin mongo (ta amfani da Robo3T ko Studio3T) kuma mu ga cewa bayanan suna cikin bayanan:
Ni ba biloniya ba ne, don haka mun gamsu da zaɓin kallo na farko.
Farin ciki da farin ciki - wakili na farko ya shirya :)
Wakili yana shirye, sabon wakili ya daɗe!
Haka ne, ya ‘yan uwa, mun kawo kashi 1/3 ne kawai na hanyar da wannan labarin ya shirya, amma kada ku karaya, domin yanzu za a samu sauki.
Don haka yanzu muna buƙatar wakili wanda ke tattara bayanan meta kuma ya sanya su cikin takaddar tattarawa:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Tun da wannan wakili zai aiwatar da bayanai game da takamaiman tsaro, muna buƙatar nuna alamar (alama) na wannan tsaro a cikin saƙon. Don wannan dalili a faust akwai records - azuzuwan da ke bayyana tsarin saƙo a cikin batun wakili.
A wannan yanayin, bari mu je rubuce-rubuce.pyda kuma bayyana yadda sakon wannan batu ya kamata ya kasance:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kamar yadda kuke tsammani, faust yana amfani da nau'in python annotation don bayyana tsarin saƙon, wanda shine dalilin da yasa mafi ƙarancin sigar da ɗakin karatu ke tallafawa shine. 3.6.
Bari mu koma kan wakili, saita nau'ikan kuma mu ƙara:
Kamar yadda kuke gani, mun wuce sabon siga tare da tsari zuwa hanyar ƙaddamar da taken - value_type. Bugu da ari, duk abin da ya bi wannan makirci, don haka ban ga wani ma'ana a zauna a kan wani abu.
Da kyau, taɓawa ta ƙarshe shine ƙara kira zuwa wakilin tattara bayanan meta don tattara_securitites:
Muna amfani da tsarin da aka sanar a baya don saƙon. A wannan yanayin, na yi amfani da hanyar .cast tun da ba mu buƙatar jira sakamakon daga wakili, amma yana da daraja ambaton hakan. hanyoyi aika sako zuwa ga batun:
simintin gyare-gyare - baya toshewa saboda baya tsammanin sakamako. Ba za ku iya aika sakamakon zuwa wani batu azaman saƙo ba.
aika - baya toshewa saboda baya tsammanin sakamako. Kuna iya ƙayyade wakili a cikin batun da sakamakon zai tafi.
tambaya - yana jiran sakamako. Kuna iya ƙayyade wakili a cikin batun da sakamakon zai tafi.
Don haka, wannan ke nan tare da wakilai na yau!
Ƙungiyar mafarki
Abu na karshe da na yi alkawarin rubutawa a wannan bangare shi ne umarni. Kamar yadda aka ambata a baya, umarni a cikin faust sune makullin kusa da dannawa. A zahiri, faust kawai yana haɗa umarnin mu na al'ada zuwa ƙirar sa lokacin da aka ƙayyade maɓallin -A
Bayan an sanar da wakilai a wakilai.py ƙara aiki tare da kayan ado app.umarnikiran hanyar jefa у tattara_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Don haka, idan muka kira jerin umarni, sabon umarnin mu zai kasance a ciki:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Za mu iya amfani da shi kamar kowa, don haka bari mu sake kunna ma'aikacin faust kuma mu fara cikakken tarin abubuwan tsaro:
> faust -A horton.agents start-collect-securities
Me zai faru a gaba?
A bangare na gaba, ta yin amfani da sauran wakilai a matsayin misali, za mu yi la'akari da hanyar nutsewa don neman matsananciyar farashin ciniki na shekara da ƙaddamar da cron na wakilai.