तो, तो, दूसरा भाग। जैसा कि पहले लिखा गया है, इसमें हम निम्नलिखित कार्य करेंगे:
आइए aiohttp पर अल्फ़ावेंटेज के लिए एक छोटा क्लाइंट लिखें जिसमें हमें आवश्यक अंतिम बिंदुओं के अनुरोध हों।
आइए एक एजेंट बनाएं जो प्रतिभूतियों पर डेटा और उन पर मेटा जानकारी एकत्र करेगा।
लेकिन, यह वही है जो हम प्रोजेक्ट के लिए करेंगे, और फॉस्ट रिसर्च के संदर्भ में, हम सीखेंगे कि एजेंटों को कैसे लिखना है जो काफ्का से स्ट्रीम घटनाओं को संसाधित करते हैं, साथ ही हमारे मामले में कमांड (क्लिक रैपर) कैसे लिखते हैं - उस विषय पर मैन्युअल पुश संदेशों के लिए जिसकी एजेंट निगरानी कर रहा है।
ट्रेनिंग
अल्फ़ावेंटेज क्लाइंट
सबसे पहले, अल्फावेटेज के अनुरोधों के लिए एक छोटा aiohttp क्लाइंट लिखें।
अल्फ़ावेंटेज एपीआई काफी सरल और खूबसूरती से डिज़ाइन किया गया है, इसलिए मैंने इस विधि के माध्यम से सभी अनुरोध करने का निर्णय लिया construct_query जहां बदले में एक http कॉल होती है।
मैं सभी फ़ील्ड लाता हूँ snake_case सहूलियत के लिए।
खैर, सुंदर और जानकारीपूर्ण ट्रेसबैक आउटपुट के लिए लॉगर.कैच सजावट।
PS स्थानीय रूप से config.yml में अल्फ़ावांटेज टोकन जोड़ना या पर्यावरण चर निर्यात करना न भूलें HORTON_SERVICE_APIKEY. हमें एक टोकन मिलता है यहां.
सीआरयूडी वर्ग
प्रतिभूतियों के बारे में मेटा जानकारी संग्रहीत करने के लिए हमारे पास प्रतिभूतियों का संग्रह होगा।
अभी हमारे पास सबसे सरल एप्लिकेशन निर्माण होगा, थोड़ी देर बाद हम इसका विस्तार करेंगे, हालांकि, आपको इंतजार न कराने के लिए, यहां प्रतिक्रिया दें संदर्भ ऐप-क्लास के लिए। मैं आपको सेटिंग क्लास पर एक नज़र डालने की भी सलाह देता हूं, क्योंकि यह अधिकांश सेटिंग्स के लिए जिम्मेदार है।
मुख्य भाग
प्रतिभूतियों की सूची एकत्र करने और बनाए रखने के लिए एजेंट
तो, सबसे पहले हमें फ़ॉस्ट एप्लिकेशन ऑब्जेक्ट मिलता है - यह काफी सरल है। इसके बाद, हम स्पष्ट रूप से अपने एजेंट के लिए एक विषय घोषित करते हैं... यहां यह उल्लेख करना उचित है कि यह क्या है, आंतरिक पैरामीटर क्या है और इसे अलग तरीके से कैसे व्यवस्थित किया जा सकता है।
काफ्का में विषय, यदि हम सटीक परिभाषा जानना चाहते हैं, तो इसे पढ़ना बेहतर है बंद। दस्तावेज़, या आप पढ़ सकते हैं सारांश रूसी में हब पर, जहां सब कुछ काफी सटीक रूप से परिलक्षित होता है :)
पैरामीटर आंतरिक, फ़ॉस्ट डॉक में काफी अच्छी तरह से वर्णित है, हमें विषय को सीधे कोड में कॉन्फ़िगर करने की अनुमति देता है, निश्चित रूप से, इसका मतलब है कि फ़ॉस्ट डेवलपर्स द्वारा प्रदान किए गए पैरामीटर, उदाहरण के लिए: अवधारण, अवधारण नीति (डिफ़ॉल्ट रूप से हटाएं, लेकिन आप सेट कर सकते हैं सघन), प्रति विषय विभाजन की संख्या (बीसियोंउदाहरण के लिए, इससे कम करना वैश्विक महत्व एप्लिकेशन फ़ॉस्ट)।
सामान्य तौर पर, एजेंट वैश्विक मूल्यों के साथ एक प्रबंधित विषय बना सकता है, हालांकि, मैं सब कुछ स्पष्ट रूप से घोषित करना पसंद करता हूं। इसके अलावा, एजेंट विज्ञापन में विषय के कुछ पैरामीटर (उदाहरण के लिए, विभाजन की संख्या या अवधारण नीति) को कॉन्फ़िगर नहीं किया जा सकता है।
विषय को मैन्युअल रूप से परिभाषित किए बिना यह कैसा दिख सकता है:
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
इसलिए, एजेंट की शुरुआत में, हम अपने क्लाइंट के माध्यम से अनुरोधों के लिए एक aiohttp सत्र खोलते हैं। इस प्रकार, एक कार्यकर्ता शुरू करते समय, जब हमारा एजेंट लॉन्च किया जाता है, तो एक सत्र तुरंत खोला जाएगा - एक, पूरे समय कार्यकर्ता चल रहा है (या कई, यदि आप पैरामीटर बदलते हैं संगामिति एक डिफ़ॉल्ट इकाई वाले एजेंट से)।
इसके बाद, हम स्ट्रीम का अनुसरण करते हैं (हम संदेश डालते हैं _, चूंकि हम, इस एजेंट में, अपने विषय के संदेशों की सामग्री की परवाह नहीं करते हैं, यदि वे वर्तमान ऑफसेट पर मौजूद हैं, अन्यथा हमारा चक्र उनके आगमन की प्रतीक्षा करेगा। खैर, हमारे लूप के अंदर, हम संदेश की रसीद लॉग करते हैं, सक्रिय की एक सूची प्राप्त करते हैं (get_securities केवल डिफ़ॉल्ट रूप से सक्रिय होती है, क्लाइंट कोड देखें) प्रतिभूतियों और इसे डेटाबेस में सहेजते हैं, जांचते हैं कि क्या समान टिकर के साथ कोई सुरक्षा है और डेटाबेस में विनिमय, अगर वहाँ है, तो यह (कागज) बस अद्यतन किया जाएगा।
आइए अपनी रचना लॉन्च करें!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
पीएस सुविधाएँ वेब घटक मैं लेखों में फ़ॉस्ट पर विचार नहीं करूंगा, इसलिए हमने उपयुक्त ध्वज स्थापित किया है।
हमारे लॉन्च कमांड में, हमने फ़ॉस्ट को बताया कि एप्लिकेशन ऑब्जेक्ट को कहां देखना है और जानकारी लॉग आउटपुट स्तर के साथ इसके साथ क्या करना है (एक कार्यकर्ता को लॉन्च करना)। हमें निम्नलिखित आउटपुट मिलता है:
आइए विभाजन सेट को देखें. जैसा कि हम देख सकते हैं, एक विषय उस नाम से बनाया गया था जिसे हमने कोड में निर्दिष्ट किया था, विभाजन की डिफ़ॉल्ट संख्या (8, से ली गई) विषय_विभाजन - एप्लिकेशन ऑब्जेक्ट पैरामीटर), चूंकि हमने अपने विषय के लिए (विभाजन के माध्यम से) कोई व्यक्तिगत मान निर्दिष्ट नहीं किया है। वर्कर में लॉन्च किए गए एजेंट को सभी 8 विभाजन सौंपे गए हैं, क्योंकि यह एकमात्र है, लेकिन क्लस्टरिंग के बारे में भाग में इस पर अधिक विस्तार से चर्चा की जाएगी।
खैर, अब हम दूसरे टर्मिनल विंडो पर जा सकते हैं और अपने विषय पर एक खाली संदेश भेज सकते हैं:
पी.एस. का उपयोग कर रहा हूँ @ हम दिखाते हैं कि हम "collect_securities" नामक विषय पर एक संदेश भेज रहे हैं।
इस स्थिति में, संदेश पार्टीशन 6 पर चला गया - आप इसे kafdrop on पर जाकर जांच सकते हैं localhost:9000
अपने कार्यकर्ता के साथ टर्मिनल विंडो पर जाकर, हम लॉगुरू का उपयोग करके भेजा गया एक सुखद संदेश देखेंगे:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
हम मोंगो (रोबो3टी या स्टूडियो3टी का उपयोग करके) पर भी गौर कर सकते हैं और देख सकते हैं कि प्रतिभूतियाँ डेटाबेस में हैं:
मैं अरबपति नहीं हूं, और इसलिए हम पहले देखने के विकल्प से संतुष्ट हैं।
खुशी और आनंद - पहला एजेंट तैयार है :)
एजेंट तैयार, नये एजेंट जिंदाबाद!
हां, सज्जनों, हमने इस लेख द्वारा तैयार किए गए पथ का केवल 1/3 भाग ही कवर किया है, लेकिन निराश न हों, क्योंकि अब यह आसान हो जाएगा।
तो अब हमें एक ऐसे एजेंट की आवश्यकता है जो मेटा जानकारी एकत्र करे और उसे एक संग्रह दस्तावेज़ में रखे:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
चूँकि यह एजेंट एक विशिष्ट सुरक्षा के बारे में जानकारी संसाधित करेगा, इसलिए हमें संदेश में इस सुरक्षा के टिकर (प्रतीक) को इंगित करना होगा। इस प्रयोजन के लिए फ़ॉस्ट में हैं अभिलेख - कक्षाएं जो एजेंट विषय में संदेश योजना घोषित करती हैं।
इस मामले में, आइए चलते हैं रिकॉर्ड्स.pyऔर वर्णन करें कि इस विषय के लिए संदेश कैसा दिखना चाहिए:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
जैसा कि आपने अनुमान लगाया होगा, फ़ॉस्ट संदेश स्कीमा का वर्णन करने के लिए पायथन प्रकार के एनोटेशन का उपयोग करता है, यही कारण है कि लाइब्रेरी द्वारा समर्थित न्यूनतम संस्करण है 3.6.
आइए एजेंट पर वापस लौटें, प्रकार सेट करें और इसे जोड़ें:
जैसा कि आप देख सकते हैं, हम विषय आरंभीकरण विधि में एक योजना के साथ एक नया पैरामीटर पास करते हैं - value_type। इसके अलावा, हर चीज़ एक ही योजना का अनुसरण करती है, इसलिए मुझे किसी और चीज़ पर ध्यान देने का कोई मतलब नहीं दिखता।
खैर, अंतिम स्पर्श मेटा सूचना संग्रह एजेंट को Collect_securitites में एक कॉल जोड़ना है:
हम संदेश के लिए पूर्व घोषित योजना का उपयोग करते हैं। इस मामले में, मैंने .कास्ट विधि का उपयोग किया क्योंकि हमें एजेंट से परिणाम की प्रतीक्षा करने की आवश्यकता नहीं है, लेकिन यह उल्लेख करने योग्य है способов विषय पर एक संदेश भेजें:
कास्ट - ब्लॉक नहीं करता क्योंकि यह किसी परिणाम की अपेक्षा नहीं करता है। आप परिणाम को किसी अन्य विषय पर संदेश के रूप में नहीं भेज सकते.
भेजें - ब्लॉक नहीं करता क्योंकि यह परिणाम की अपेक्षा नहीं करता है। आप उस विषय में एक एजेंट निर्दिष्ट कर सकते हैं जिस पर परिणाम जाएगा।
पूछना - परिणाम की प्रतीक्षा करता है। आप उस विषय में एक एजेंट निर्दिष्ट कर सकते हैं जिस पर परिणाम जाएगा।
तो, आज एजेंटों के लिए बस इतना ही!
सर्वश्रेष्ठ टीम
आखिरी चीज़ जो मैंने इस भाग में लिखने का वादा किया था वह है आदेश। जैसा कि पहले उल्लेख किया गया है, फ़ॉस्ट में कमांड क्लिक के चारों ओर एक आवरण हैं। वास्तव में, -A कुंजी निर्दिष्ट करते समय faust बस हमारे कस्टम कमांड को इसके इंटरफ़ेस से जोड़ देता है
एजेंटों की घोषणा के बाद Agents.py डेकोरेटर के साथ एक फ़ंक्शन जोड़ें ऐप.कमांडविधि को कॉल करना डालना у कलेक्ट_सिक्योरिटीज़:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
इस प्रकार, यदि हम आदेशों की सूची कहते हैं, तो हमारा नया आदेश उसमें होगा:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
हम इसे किसी अन्य की तरह उपयोग कर सकते हैं, तो आइए फ़ॉस्ट वर्कर को पुनः आरंभ करें और प्रतिभूतियों का पूर्ण संग्रह शुरू करें:
> faust -A horton.agents start-collect-securities
आगे क्या होगा?
अगले भाग में, शेष एजेंटों को एक उदाहरण के रूप में उपयोग करते हुए, हम वर्ष के लिए व्यापार की समापन कीमतों और एजेंटों के क्रॉन लॉन्च में चरम सीमाओं की खोज के लिए सिंक तंत्र पर विचार करेंगे।
पीएस पिछले भाग के तहत मुझसे फॉस्ट और कंफ्लुएंट काफ्का के बारे में पूछा गया था (कंफ्लुएंट में क्या विशेषताएं हैं?). ऐसा लगता है कि कंफ्लुएंट कई मायनों में अधिक कार्यात्मक है, लेकिन तथ्य यह है कि फॉस्ट के पास कंफ्लुएंट के लिए पूर्ण ग्राहक समर्थन नहीं है - यह इस प्रकार है दस्तावेज़ में ग्राहक प्रतिबंधों का विवरण.