కాబట్టి, రెండవ భాగం. ఇంతకు ముందు వ్రాసినట్లుగా, దానిలో మేము ఈ క్రింది వాటిని చేస్తాము:
aiohttpలో మనకు అవసరమైన ముగింపు పాయింట్ల కోసం అభ్యర్థనలతో ఆల్ఫావాంటేజ్ కోసం ఒక చిన్న క్లయింట్ని వ్రాద్దాం.
సెక్యూరిటీలు మరియు వాటిపై మెటా సమాచారాన్ని సేకరించే ఏజెంట్ని క్రియేట్ చేద్దాం.
కానీ, ప్రాజెక్ట్ కోసం మేము ఇదే చేస్తాము మరియు ఫస్ట్ రీసెర్చ్ పరంగా, కాఫ్కా నుండి స్ట్రీమ్ ఈవెంట్లను ప్రాసెస్ చేసే ఏజెంట్లను ఎలా వ్రాయాలో, అలాగే ఆదేశాలను ఎలా వ్రాయాలో (క్లిక్ రేపర్) నేర్చుకుంటాము - ఏజెంట్ పర్యవేక్షిస్తున్న అంశానికి మాన్యువల్ పుష్ సందేశాల కోసం.
శిక్షణ
AlphaVantage క్లయింట్
ముందుగా, ఆల్ఫావాంటేజ్కి అభ్యర్థనల కోసం చిన్న aiohttp క్లయింట్ని వ్రాద్దాం.
AlphaVantage API చాలా సరళంగా మరియు అందంగా రూపొందించబడింది, కాబట్టి నేను అన్ని అభ్యర్థనలను పద్ధతి ద్వారా చేయాలని నిర్ణయించుకున్నాను construct_query ఇక్కడ ఒక http కాల్ ఉంది.
నేను అన్ని రంగాలను తీసుకువస్తాను snake_case సౌలభ్యం కోసం.
బాగా, అందమైన మరియు ఇన్ఫర్మేటివ్ ట్రేస్బ్యాక్ అవుట్పుట్ కోసం లాగర్.క్యాచ్ అలంకరణ.
PS స్థానికంగా config.ymlకు ఆల్ఫావాంటేజ్ టోకెన్ని జోడించడం లేదా ఎన్విరాన్మెంట్ వేరియబుల్ని ఎగుమతి చేయడం మర్చిపోవద్దు HORTON_SERVICE_APIKEY. మేము టోకెన్ అందుకుంటాము ఇక్కడ.
CRUD తరగతి
సెక్యూరిటీల గురించి మెటా సమాచారాన్ని నిల్వ చేయడానికి మేము సెక్యూరిటీల సేకరణను కలిగి ఉంటాము.
ప్రస్తుతానికి మేము సరళమైన అప్లికేషన్ సృష్టిని కలిగి ఉన్నాము, కొద్దిసేపటి తర్వాత మేము దానిని విస్తరిస్తాము, అయినప్పటికీ, మీరు వేచి ఉండకుండా ఉండటానికి, ఇక్కడ ప్రస్తావనలు యాప్-తరగతికి. సెట్టింగుల తరగతిని పరిశీలించమని కూడా నేను మీకు సలహా ఇస్తున్నాను, ఎందుకంటే ఇది చాలా సెట్టింగులకు బాధ్యత వహిస్తుంది.
ప్రధాన శరీరం
సెక్యూరిటీల జాబితాను సేకరించడం మరియు నిర్వహించడం కోసం ఏజెంట్
కాబట్టి, మొదట మనం ఫాస్ట్ అప్లికేషన్ ఆబ్జెక్ట్ను పొందుతాము - ఇది చాలా సులభం. తరువాత, మేము మా ఏజెంట్ కోసం ఒక అంశాన్ని స్పష్టంగా ప్రకటిస్తాము... ఇక్కడ అది ఏమిటో, అంతర్గత పరామితి ఏమిటి మరియు దీన్ని ఎలా విభిన్నంగా ఏర్పాటు చేయవచ్చో పేర్కొనడం విలువ.
కాఫ్కాలోని అంశాలు, ఖచ్చితమైన నిర్వచనం తెలుసుకోవాలంటే, చదవడం మంచిది ఆఫ్. పత్రం, లేదా మీరు చదువుకోవచ్చు సంగ్రహం రష్యన్లో హబ్రేలో, ప్రతిదీ కూడా చాలా ఖచ్చితంగా ప్రతిబింబిస్తుంది :)
అంతర్గత పరామితి, ఫాస్ట్ డాక్లో బాగా వివరించబడింది, టాపిక్ను నేరుగా కోడ్లో కాన్ఫిగర్ చేయడానికి మమ్మల్ని అనుమతిస్తుంది, అయితే, దీని అర్థం ఫౌస్ట్ డెవలపర్లు అందించిన పారామితులు, ఉదాహరణకు: నిలుపుదల, నిలుపుదల విధానం (డిఫాల్ట్గా తొలగించండి, కానీ మీరు సెట్ చేయవచ్చు కాంపాక్ట్), ప్రతి అంశానికి విభజనల సంఖ్య (విభజనలుచేయడానికి, ఉదాహరణకు, కంటే తక్కువ ప్రపంచ ప్రాముఖ్యత అప్లికేషన్లు ఫాస్ట్).
సాధారణంగా, ఏజెంట్ గ్లోబల్ విలువలతో నిర్వహించబడే అంశాన్ని సృష్టించవచ్చు, అయినప్పటికీ, నేను ప్రతి విషయాన్ని స్పష్టంగా ప్రకటించాలనుకుంటున్నాను. అదనంగా, ఏజెంట్ ప్రకటనలో టాపిక్ యొక్క కొన్ని పారామితులు (ఉదాహరణకు, విభజనల సంఖ్య లేదా నిలుపుదల విధానం) కాన్ఫిగర్ చేయబడవు.
అంశాన్ని మాన్యువల్గా నిర్వచించకుండా ఇది ఎలా ఉంటుందో ఇక్కడ ఉంది:
సరే, ఇప్పుడు మా ఏజెంట్ ఏమి చేస్తాడో వివరిస్తాము :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
కాబట్టి, ఏజెంట్ ప్రారంభంలో, మేము మా క్లయింట్ ద్వారా అభ్యర్థనల కోసం aiohttp సెషన్ను తెరుస్తాము. ఈ విధంగా, ఒక కార్మికుడిని ప్రారంభించినప్పుడు, మా ఏజెంట్ ప్రారంభించబడినప్పుడు, ఒక సెషన్ వెంటనే తెరవబడుతుంది - ఒకటి, మొత్తం సమయం కోసం కార్మికుడు నడుస్తున్నాడు (లేదా అనేక, మీరు పరామితిని మార్చినట్లయితే అనుకూలత డిఫాల్ట్ యూనిట్ ఉన్న ఏజెంట్ నుండి).
తరువాత, మేము స్ట్రీమ్ను అనుసరిస్తాము (మేము సందేశాన్ని ఉంచుతాము _, మేము, ఈ ఏజెంట్లో, మా అంశం నుండి వచ్చే సందేశాల) కంటెంట్ గురించి పట్టించుకోము కాబట్టి, అవి ప్రస్తుత ఆఫ్సెట్లో ఉంటే, లేకపోతే మా చక్రం వారి రాక కోసం వేచి ఉంటుంది. సరే, మా లూప్ లోపల, మేము సందేశం యొక్క రసీదుని లాగ్ చేస్తాము, సక్రియ (get_securities డిఫాల్ట్గా మాత్రమే యాక్టివ్గా మాత్రమే రిటర్న్లు, క్లయింట్ కోడ్ను చూడండి) సెక్యూరిటీల జాబితాను పొందండి మరియు డేటాబేస్లో సేవ్ చేసి, అదే టిక్కర్తో భద్రత ఉందో లేదో తనిఖీ చేస్తాము మరియు డేటాబేస్లో మార్పిడి , ఉంటే, అది (కాగితం) కేవలం నవీకరించబడుతుంది.
మన సృష్టిని ప్రారంభిద్దాం!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS ఫీచర్లు వెబ్ భాగం నేను వ్యాసాలలో ఫౌస్ట్ను పరిగణించను, కాబట్టి మేము తగిన జెండాను సెట్ చేసాము.
మా లాంచ్ కమాండ్లో, సమాచార లాగ్ అవుట్పుట్ స్థాయితో అప్లికేషన్ ఆబ్జెక్ట్ కోసం ఎక్కడ వెతకాలి మరియు దానితో ఏమి చేయాలి (వర్కర్ను ప్రారంభించండి) అని మేము ఫాస్ట్కు చెప్పాము. మేము ఈ క్రింది అవుట్పుట్ను పొందుతాము:
విభజన సమితిని చూద్దాం. మనం చూడగలిగినట్లుగా, మేము కోడ్లో నిర్దేశించిన పేరుతో, డిఫాల్ట్ విభజనల సంఖ్య (8, నుండి తీసుకోబడింది)తో ఒక అంశం సృష్టించబడింది. అంశం_విభజనలు - అప్లికేషన్ ఆబ్జెక్ట్ పరామితి), ఎందుకంటే మేము మా అంశం కోసం వ్యక్తిగత విలువను పేర్కొనలేదు (విభజనల ద్వారా). వర్కర్లో ప్రారంభించబడిన ఏజెంట్కు మొత్తం 8 విభజనలు కేటాయించబడతాయి, ఎందుకంటే ఇది ఒక్కటే, అయితే ఇది క్లస్టరింగ్ గురించి భాగంలో మరింత వివరంగా చర్చించబడుతుంది.
సరే, ఇప్పుడు మనం మరొక టెర్మినల్ విండోకు వెళ్లి మా అంశానికి ఖాళీ సందేశాన్ని పంపవచ్చు:
PS ఉపయోగిస్తోంది @ మేము "collect_securities" అనే టాపిక్కి సందేశాన్ని పంపుతున్నామని చూపిస్తాము.
ఈ సందర్భంలో, సందేశం విభజన 6కి వెళ్లింది - మీరు kafdrop ఆన్కి వెళ్లడం ద్వారా దీన్ని తనిఖీ చేయవచ్చు localhost:9000
మా వర్కర్తో టెర్మినల్ విండోకు వెళితే, లోగురుని ఉపయోగించి పంపబడిన సంతోషకరమైన సందేశాన్ని చూస్తాము:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
మేము మొంగో (Robo3T లేదా Studio3T ఉపయోగించి)ని కూడా చూడవచ్చు మరియు సెక్యూరిటీలు డేటాబేస్లో ఉన్నాయని చూడవచ్చు:
నేను బిలియనీర్ కాదు, కాబట్టి మేము మొదటి వీక్షణ ఎంపికతో సంతృప్తి చెందాము.
ఆనందం మరియు ఆనందం - మొదటి ఏజెంట్ సిద్ధంగా ఉంది :)
ఏజెంట్ సిద్ధంగా ఉన్నారు, కొత్త ఏజెంట్ చిరకాలం జీవించండి!
అవును, పెద్దమనుషులు, మేము ఈ వ్యాసం ద్వారా సిద్ధం చేసిన మార్గంలో 1/3 మాత్రమే కవర్ చేసాము, కానీ నిరుత్సాహపడకండి, ఎందుకంటే ఇప్పుడు అది సులభం అవుతుంది.
కాబట్టి ఇప్పుడు మనకు మెటా సమాచారాన్ని సేకరించి సేకరణ పత్రంలో ఉంచే ఏజెంట్ కావాలి:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
ఈ ఏజెంట్ నిర్దిష్ట భద్రతకు సంబంధించిన సమాచారాన్ని ప్రాసెస్ చేస్తుంది కాబట్టి, మేము సందేశంలో ఈ భద్రత యొక్క టిక్కర్ (చిహ్నం)ని సూచించాలి. ఫాస్ట్ లో ఈ ప్రయోజనం కోసం ఉన్నాయి రికార్డ్స్ — ఏజెంట్ అంశంలో సందేశ పథకాన్ని ప్రకటించే తరగతులు.
ఈ సందర్భంలో, వెళ్దాం records.pyమరియు ఈ అంశానికి సంబంధించిన సందేశం ఎలా ఉండాలో వివరించండి:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
మీరు ఊహించినట్లుగా, సందేశ స్కీమాను వివరించడానికి ఫాస్ట్ పైథాన్ రకం ఉల్లేఖనాన్ని ఉపయోగిస్తుంది, అందుకే లైబ్రరీ మద్దతు ఇచ్చే కనీస సంస్కరణ 3.6.
ఏజెంట్కి తిరిగి వెళ్దాం, రకాలను సెట్ చేసి, జోడించు:
మీరు చూడగలిగినట్లుగా, మేము టాపిక్ ప్రారంభ పద్ధతికి కొత్త పరామితిని స్కీమ్తో పాస్ చేస్తాము - value_type. ఇంకా, ప్రతిదీ ఒకే స్కీమ్ను అనుసరిస్తుంది, కాబట్టి నేను దేనిపైనా దృష్టి పెట్టడం లేదు.
సరే, కలెక్ట్_సెక్యూరిటీస్కి మెటా ఇన్ఫర్మేషన్ కలెక్షన్ ఏజెంట్కి కాల్ని జోడించడం చివరి టచ్:
మేము సందేశం కోసం గతంలో ప్రకటించిన పథకాన్ని ఉపయోగిస్తాము. ఈ సందర్భంలో, నేను ఏజెంట్ నుండి ఫలితం కోసం వేచి ఉండాల్సిన అవసరం లేదు కాబట్టి నేను .cast పద్ధతిని ఉపయోగించాను, కానీ అది ప్రస్తావించదగినది మార్గాలు అంశానికి సందేశం పంపండి:
తారాగణం - నిరోధించదు ఎందుకంటే ఇది ఫలితాన్ని ఆశించదు. మీరు ఫలితాన్ని మరొక అంశానికి సందేశంగా పంపలేరు.
పంపండి - నిరోధించదు ఎందుకంటే ఇది ఫలితాన్ని ఆశించదు. ఫలితం వెళ్లే అంశంలో మీరు ఏజెంట్ను పేర్కొనవచ్చు.
అడగండి - ఫలితం కోసం వేచి ఉంది. ఫలితం వెళ్లే అంశంలో మీరు ఏజెంట్ను పేర్కొనవచ్చు.
కాబట్టి, ఈరోజు ఏజెంట్లతో అంతే!
కలల బృందం
ఈ భాగంలో వ్రాయడానికి నేను వాగ్దానం చేసిన చివరి విషయం ఆదేశాలు. ఇంతకు ముందే చెప్పినట్లుగా, ఫాస్ట్లో ఉన్న కమాండ్లు క్లిక్ చుట్టూ చుట్టేవి. వాస్తవానికి, -A కీని పేర్కొనేటప్పుడు ఫౌస్ట్ మా అనుకూల ఆదేశాన్ని దాని ఇంటర్ఫేస్కు జత చేస్తుంది
లో ప్రకటించిన ఏజెంట్ల తర్వాత agents.py డెకరేటర్తో ఫంక్షన్ను జోడించండి app.commandపద్ధతిని పిలుస్తోంది తారాగణంగా у సేకరణ_సెక్యూరిటీలు:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
కాబట్టి, మేము ఆదేశాల జాబితాను కాల్ చేస్తే, మన కొత్త ఆదేశం దానిలో ఉంటుంది:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
మేము దీన్ని ఇతరుల మాదిరిగానే ఉపయోగించవచ్చు, కాబట్టి ఫాస్ట్ వర్కర్ని పునఃప్రారంభించి, పూర్తి స్థాయి సెక్యూరిటీల సేకరణను ప్రారంభిద్దాం:
> faust -A horton.agents start-collect-securities
తర్వాత ఏం జరుగుతుంది?
తరువాతి భాగంలో, మిగిలిన ఏజెంట్లను ఉదాహరణగా ఉపయోగించి, సంవత్సరానికి సంబంధించిన ట్రేడింగ్ ముగింపు ధరలలో మరియు ఏజెంట్ల క్రాన్ లాంచ్లో తీవ్రతలను శోధించడానికి మేము సింక్ మెకానిజంను పరిశీలిస్తాము.
PS చివరి భాగం కింద నన్ను ఫౌస్ట్ మరియు సంగమ కాఫ్కా గురించి అడిగారు (సంగమం ఏ లక్షణాలను కలిగి ఉంది?) కాన్ఫ్లూయెంట్ అనేక విధాలుగా మరింత క్రియాత్మకంగా ఉన్నట్లు అనిపిస్తుంది, అయితే వాస్తవం ఏమిటంటే ఫౌస్ట్కు సంగమానికి పూర్తి క్లయింట్ మద్దతు లేదు - ఇది క్రింది నుండి పత్రంలో క్లయింట్ పరిమితుల వివరణలు.