Se konsa, se konsa, dezyèm pati a. Jan sa ekri pi bonè, nan li nou pral fè bagay sa yo:
Ann ekri yon ti kliyan pou alphavantage sou aiohttp ak demann pou pwen final nou bezwen yo.
Ann kreye yon ajan ki pral kolekte done sou sekirite ak meta enfòmasyon sou yo.
Men, sa a se sa nou pral fè pou pwojè a tèt li, ak an tèm de rechèch faust, nou pral aprann ki jan yo ekri ajan ki trete evènman kouran soti nan kafka, osi byen ke ki jan yo ekri kòmandman (klike sou wrapper), nan ka nou an - pou mesaj pouse manyèl nan sijè a ke ajan an ap kontwole.
Fòmasyon
AlphaVantage Kliyan
Premyèman, se pou yo ekri yon ti kliyan aiohttp pou demann alphavantage.
Pou kounye a nou pral gen kreyasyon aplikasyon ki pi senp, yon ti kras pita nou pral elaji li, sepandan, yo nan lòd yo pa kenbe ou ap tann, isit la referans a App-klas. Mwen konseye w tou pran yon gade nan klas la anviwònman, paske li responsab pou pifò nan anviwònman yo.
Se konsa, premye nou jwenn objè aplikasyon faust la - li byen senp. Apre sa, nou klèman deklare yon sijè pou ajan nou an... Isit la li vo mansyone sa li ye, ki sa paramèt entèn la se ak ki jan sa a ka ranje yon fason diferan.
Sijè nan kafka, si nou vle konnen definisyon egzak la, li pi bon pou li koupe. dokiman, oswa ou ka li konpendim sou Habré nan Ris, kote tout bagay tou reflete byen avèk presizyon :)
Paramèt entèn, ki dekri byen nan doc faust a, pèmèt nou konfigirasyon sijè a dirèkteman nan kòd la, nan kou, sa vle di paramèt yo bay devlopè yo faust, pou egzanp: retansyon, retansyon politik (pa default efase, men ou ka mete kontra enfòmèl ant), kantite patisyon pou chak sijè (nòtfè, pou egzanp, mwens pase siyifikasyon mondyal aplikasyon pou faust).
An jeneral, ajan an ka kreye yon sijè jere ak valè mondyal, sepandan, mwen renmen deklare tout bagay klèman. Anplis de sa, kèk paramèt (pa egzanp, kantite patisyon oswa politik retansyon) nan sijè a nan reklam ajan an pa ka configuré.
Men sa li ta ka sanble san yo pa defini sijè a manyèlman:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Se konsa, nan kòmansman ajan an, nou louvri yon sesyon aiohttp pou demann atravè kliyan nou an. Kidonk, lè yo kòmanse yon travayè, lè ajan nou an lanse, yon sesyon pral imedyatman louvri - youn, pou tout tan travayè a ap kouri (oswa plizyè, si ou chanje paramèt la). konkouran soti nan yon ajan ki gen yon inite default).
Apre sa, nou swiv kouran an (nou mete mesaj la nan _, depi nou, nan ajan sa a, pa pran swen sou kontni an) nan mesaj ki soti nan sijè nou an, si yo egziste nan konpanse aktyèl la, otreman sik nou an ap tann pou rive yo. Oke, andedan bouk nou an, nou konekte resi mesaj la, jwenn yon lis sekirite aktif (get_securities retounen sèlman aktif pa default, gade kòd kliyan) epi sove li nan baz done a, tcheke si gen yon sekirite ki gen menm ticker la ak echanj nan baz done a, si gen, Lè sa a, li (papye a) pral tou senpleman mete ajou.
Ann lanse kreyasyon nou an!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Karakteristik eleman entènèt Mwen pa pral konsidere faust nan atik yo, kidonk nou mete drapo ki apwopriye a.
Nan lòd lansman nou an, nou te di faust ki kote pou chèche objè aplikasyon an ak sa pou w fè ak li (lanse yon travayè) ak nivo pwodiksyon boutèy enfòmasyon an. Nou jwenn pwodiksyon sa a:
Ann gade nan seri patisyon an. Kòm nou ka wè, yo te kreye yon sijè ak non ke nou deziyen nan kòd la, nimewo a default nan patisyon (8, pran nan topic_partitions - paramèt objè aplikasyon), paske nou pa t presize yon valè endividyèl pou sijè nou an (via partisyon). Ajan an te lanse nan travayè a asiyen tout 8 patisyon, paske li se youn nan sèlman, men sa a pral diskite an plis detay nan pati a sou clustering.
Oke, kounye a nou ka ale nan yon lòt fenèt tèminal epi voye yon mesaj vid nan sijè nou an:
PS itilize @ nou montre ke nou ap voye yon mesaj nan yon sijè ki rele "collect_securities".
Nan ka sa a, mesaj la te ale nan patisyon 6 - ou ka tcheke sa a pa ale nan kafdrop sou localhost:9000
Ale nan fennèt tèminal la ak travayè nou an, nou pral wè yon mesaj kontan voye lè l sèvi avèk loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Nou ka gade tou nan mongo (lè l sèvi avèk Robo3T oswa Studio3T) epi wè ke sekirite yo nan baz done a:
Mwen pa yon bilyonè, e se poutèt sa nou kontante nou ak premye opsyon gade.
Bonè ak kè kontan - premye ajan an pare :)
Ajan pare, viv nouvo ajan an!
Wi, mesye, nou te sèlman kouvri 1/3 nan chemen an prepare nan atik sa a, men pa dekouraje, paske kounye a li pral pi fasil.
Se konsa, kounye a nou bezwen yon ajan ki kolekte meta enfòmasyon epi mete l nan yon dokiman koleksyon:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Piske ajan sa a pral trete enfòmasyon sou yon sekirite espesifik, nou bezwen endike ticker (senbòl) sekirite sa a nan mesaj la. Pou rezon sa a nan faust genyen Albòm — klas ki deklare konplo mesaj la nan sijè ajan an.
Nan ka sa a, ann ale nan dosye.pyepi dekri ki jan mesaj pou sijè sa a ta dwe sanble:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Kòm ou ta ka devine, faust sèvi ak anotasyon tip python pou dekri chema mesaj la, ki se poukisa vèsyon minimòm bibliyotèk la sipòte se 3.6.
Ann retounen nan ajan an, mete kalite yo epi ajoute li:
Kòm ou ka wè, nou pase yon nouvo paramèt ak yon konplo nan metòd inisyalizasyon sijè a - value_type. Pli lwen, tout bagay swiv menm konplo a, kidonk mwen pa wè okenn pwen nan rete sou nenpòt lòt bagay.
Oke, manyen final la se ajoute yon apèl nan ajan koleksyon meta enfòmasyon pou collect_securitites:
Nou itilize konplo a te anonse deja pou mesaj la. Nan ka sa a, mwen te itilize metòd .cast la paske nou pa bezwen tann rezilta nan ajan an, men li vo mansyone ke fason voye yon mesaj sou sijè a:
jete - pa bloke paske li pa atann yon rezilta. Ou pa ka voye rezilta a nan yon lòt sijè kòm yon mesaj.
voye - pa bloke paske li pa atann yon rezilta. Ou ka presize yon ajan nan sijè ki rezilta a pral ale.
mande - tann yon rezilta. Ou ka presize yon ajan nan sijè ki rezilta a pral ale.
Se konsa, sa a tout ak ajan pou jodi a!
Ekip rèv la
Dènye bagay mwen te pwomèt pou m ekri nan pati sa a se kòmandman. Kòm mansyone pi bonè, kòmandman nan faust se yon wrapper alantou klike sou. An reyalite, faust tou senpleman atache kòmand koutim nou an nan koòdone li yo lè li presize kle a -A
Apre ajan yo te anonse nan agents.py ajoute yon fonksyon ak yon dekoratè app.commandrele metòd la jete у kolekte_sekirite:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Kidonk, si nou rele lis kòmandman an, nouvo kòmandman nou an pral ladan l:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Nou ka sèvi ak li tankou nenpòt lòt moun, kidonk ann rekòmanse travayè faust la epi kòmanse yon koleksyon sekirite konplè:
> faust -A horton.agents start-collect-securities
Kisa ki pral rive apre?
Nan pwochen pati a, lè l sèvi avèk ajan ki rete yo kòm yon egzanp, nou pral konsidere mekanis nan koule pou chèche ekstrèm nan pri yo fèmen nan komès pou ane a ak lansman an cron nan ajan.