Այսպիսով, այսպես, երկրորդ մասը: Ինչպես ավելի վաղ գրվել է, դրանում մենք կանենք հետևյալը.
Եկեք գրենք փոքր հաճախորդ alphavantage-ի համար aiohttp-ում՝ մեզ անհրաժեշտ վերջնակետերի հարցումներով:
Եկեք ստեղծենք գործակալ, ով կհավաքի տվյալներ արժեթղթերի և դրանց վերաբերյալ մետա տեղեկատվություն:
Բայց սա այն է, ինչ մենք կանենք հենց նախագծի համար, և ֆաուստ հետազոտության առումով մենք կսովորենք, թե ինչպես գրել գործակալներ, որոնք մշակում են իրադարձությունները կաֆկայից, ինչպես նաև ինչպես գրել հրամաններ (սեղմեք փաթաթան), մեր դեպքում՝ ձեռքով հրում հաղորդագրությունների համար այն թեմային, որը գործակալը վերահսկում է:
Ուսուցում
AlphaVantage հաճախորդ
Նախ, եկեք գրենք փոքրիկ aiohttp հաճախորդ alphavantage-ի հարցումների համար:
AlphaVantage API-ն բավականին պարզ և գեղեցիկ ձևավորված է, ուստի ես որոշեցի կատարել բոլոր հարցումները մեթոդի միջոցով construct_query որտեղ իր հերթին կա http զանգ.
Ես բերում եմ բոլոր դաշտերը snake_case հարմարավետության համար:
Դե, logger.catch ձևավորումը գեղեցիկ և տեղեկատվական հետքի արդյունքի համար:
Հ.Գ. Չմոռանաք ավելացնել alphavantage նշանը տեղում config.yml-ում կամ արտահանել շրջակա միջավայրի փոփոխականը: HORTON_SERVICE_APIKEY. Մենք ստանում ենք նշան այստեղ.
CRUD դաս
Մենք կունենանք արժեթղթերի հավաքածու՝ արժեթղթերի մասին մետա տեղեկատվությունը պահելու համար:
Առայժմ մենք կունենանք ամենապարզ հավելվածի ստեղծումը, մի փոքր ուշ այն կընդլայնենք, սակայն ձեզ սպասեցնելու համար այստեղ. հղումներ դեպի App-class: Ես նաև խորհուրդ եմ տալիս դիտել կարգավորումների դասը, քանի որ այն պատասխանատու է պարամետրերի մեծ մասի համար:
Հիմնական մասը
Արժեթղթերի ցուցակի հավաքագրման և պահպանման գործակալ
Այսպիսով, նախ մենք ստանում ենք faust հավելվածի օբյեկտը, դա բավականին պարզ է: Հաջորդը, մենք հստակորեն հայտարարում ենք թեմա մեր գործակալի համար... Այստեղ արժե նշել, թե ինչ է դա, որն է ներքին պարամետրը և ինչպես դա կարող է տարբեր կերպ դասավորվել:
Թեմաներ կաֆկայում, եթե ուզում ենք իմանալ ճշգրիտ սահմանումը, ավելի լավ է կարդալ անջատված է. փաստաթուղթ, կամ կարող եք կարդալ ամփոփագիր ռուսերեն Habré-ի վրա, որտեղ նույնպես ամեն ինչ բավականին ճշգրիտ է արտացոլված :)
Ներքին պարամետր, որը բավականին լավ նկարագրված է faust doc-ում, թույլ է տալիս մեզ կարգավորել թեման ուղղակիորեն կոդի մեջ, իհարկե, սա նշանակում է faust ծրագրավորողների կողմից տրամադրված պարամետրերը, օրինակ՝ պահպանում, պահպանման քաղաքականություն (կանխադրված ջնջել, բայց կարող եք սահմանել կուռ), բաժինների քանակը մեկ թեմայի համար (միավորներըանել, օրինակ, ավելի քիչ, քան համաշխարհային նշանակություն դիմումները faust):
Ընդհանուր առմամբ, գործակալը կարող է ստեղծել գլոբալ արժեքներով կառավարվող թեմա, այնուամենայնիվ, ես սիրում եմ ամեն ինչ հստակորեն հայտարարել: Բացի այդ, գործակալի գովազդի թեմայի որոշ պարամետրեր (օրինակ՝ բաժանումների քանակը կամ պահպանման քաղաքականությունը) չեն կարող կազմաձևվել:
Ահա թե ինչ կարող է լինել այն առանց թեման ձեռքով սահմանելու.
Դե, հիմա եկեք նկարագրենք, թե ինչ է անելու մեր գործակալը :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Այսպիսով, գործակալի սկզբում մենք բացում ենք aiohttp նիստը մեր հաճախորդի միջոցով հարցումների համար: Այսպիսով, աշխատող սկսելիս, երբ մեր գործակալը գործարկվի, անմիջապես կբացվի նիստ՝ մեկ՝ աշխատողի աշխատանքի ողջ ընթացքում (կամ մի քանիսը, եթե փոխեք պարամետրը զուգահեռություն լռելյայն միավոր ունեցող գործակալից):
Հաջորդը, մենք հետևում ենք հոսքին (մենք տեղադրում ենք հաղորդագրությունը _, քանի որ մենք՝ այս գործակալում, թքած ունենք մեր թեմայի հաղորդագրությունների բովանդակության վրա, եթե դրանք գոյություն ունեն ընթացիկ օֆսեթում, հակառակ դեպքում մեր ցիկլը կսպասի դրանց ժամանմանը: Դե, մեր օղակի ներսում մենք գրանցում ենք հաղորդագրության անդորրագիրը, ստանում ենք ակտիվ (get_securities վերադարձնում է միայն լռելյայն ակտիվ, տես հաճախորդի կոդը) արժեթղթերի ցուցակը և պահում ենք այն տվյալների բազայում՝ ստուգելով, թե արդյոք կա նույն նշանով արժեթուղթ և փոխանակում տվյալների բազայում, եթե կա, ապա այն (թուղթը) պարզապես կթարմացվի:
Եկեք գործարկենք մեր ստեղծագործությունը:
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS-ի առանձնահատկությունները վեբ բաղադրիչ Հոդվածներում ֆաուստը չեմ համարի, ուստի մենք համապատասխան դրոշ ենք սահմանել։
Մեր գործարկման հրամանում մենք Faust-ին ասացինք, թե որտեղ փնտրել հավելվածի օբյեկտը և ինչ անել դրա հետ (գործարկել աշխատող) տեղեկամատյանի ելքի մակարդակով: Մենք ստանում ենք հետևյալ արդյունքը.
Եկեք նայենք բաժանման հավաքածուին: Ինչպես տեսնում ենք, ստեղծվել է մի թեմա, որի անունը մենք նշել ենք կոդում, բաժանումների լռելյայն թիվը (8, վերցված է թեմա_բաժանումներ - հավելվածի օբյեկտի պարամետր), քանի որ մենք չենք նշել անհատական արժեք մեր թեմայի համար (բաժանումների միջոցով): Աշխատողում գործարկված գործակալին նշանակված են բոլոր 8 միջնորմները, քանի որ այն միակն է, բայց դա ավելի մանրամասն կքննարկվի կլաստերավորման մասին մասում։
Դե, հիմա մենք կարող ենք գնալ մեկ այլ տերմինալի պատուհան և դատարկ հաղորդագրություն ուղարկել մեր թեմային.
PS օգտագործելով @ մենք ցույց ենք տալիս, որ մենք հաղորդագրություն ենք ուղարկում «հավաքել_արժեթղթեր» անունով թեմային:
Այս դեպքում հաղորդագրությունը գնաց բաժին 6. կարող եք ստուգել սա՝ անցնելով kafdrop on localhost:9000
Անցնելով տերմինալի պատուհանը մեր աշխատողի հետ, մենք կտեսնենք ուրախ հաղորդագրություն, որը ուղարկվում է loguru-ի միջոցով.
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Մենք կարող ենք նաև նայել mongo-ին (օգտագործելով Robo3T կամ Studio3T) և տեսնել, որ արժեթղթերը գտնվում են տվյալների բազայում.
Ես միլիարդատեր չեմ, և հետևաբար մենք բավարարվում ենք դիտման առաջին տարբերակով։
Երջանկություն և ուրախություն - առաջին գործակալը պատրաստ է :)
Գործակալը պատրաստ է, կեցցե նոր գործակալը:
Այո՛, պարոնայք, մենք այս հոդվածով պատրաստած ճանապարհի միայն 1/3-ն ենք անցել, բայց մի հուսահատվեք, քանի որ հիմա ավելի հեշտ կլինի։
Այսպիսով, այժմ մեզ անհրաժեշտ է գործակալ, որը հավաքում է մետա տեղեկատվություն և տեղադրում այն հավաքածուի փաստաթղթում.
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Քանի որ այս գործակալը կմշակի որոշակի անվտանգության մասին տեղեկատվությունը, մենք պետք է հաղորդագրության մեջ նշենք այս անվտանգության նշանը (խորհրդանիշը): Այս նպատակով ֆաուստում կան Պահեստավորված նյութեր — դասեր, որոնք հայտարարում են հաղորդագրության սխեման գործակալի թեմայում:
Այս դեպքում, եկեք գնանք records.pyև նկարագրեք, թե ինչպիսին պետք է լինի այս թեմայի հաղորդագրությունը.
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Ինչպես կարող էիք կռահել, Ֆաուստը օգտագործում է python տեսակի անոտացիա՝ հաղորդագրության սխեման նկարագրելու համար, ինչի պատճառով գրադարանի կողմից աջակցվող նվազագույն տարբերակը. 3.6.
Եկեք վերադառնանք գործակալին, սահմանենք տեսակները և ավելացնենք այն.
Ինչպես տեսնում եք, մենք սխեմայով նոր պարամետր ենք փոխանցում թեմայի սկզբնավորման մեթոդին՝ value_type: Ավելին, ամեն ինչ հետևում է նույն սխեմային, ուստի ես որևէ այլ բանի վրա կանգ առնելու իմաստ չեմ տեսնում:
Դե, վերջին հպումը մետա տեղեկատվության հավաքագրման գործակալին զանգ ավելացնելն է՝ collect_securitites:
Հաղորդագրության համար օգտագործում ենք նախկինում հայտարարված սխեման։ Այս դեպքում ես օգտագործեցի .cast մեթոդը, քանի որ պետք չէ սպասել գործակալի արդյունքին, բայց հարկ է նշել, որ. ուղիները հաղորդագրություն ուղարկել թեմային.
cast - չի արգելափակում, քանի որ արդյունք չի ակնկալում: Դուք չեք կարող արդյունքը ուղարկել այլ թեմայի որպես հաղորդագրություն:
ուղարկել - չի արգելափակում, քանի որ արդյունք չի ակնկալում: Թեմայում կարող եք նշել գործակալ, որին կգնա արդյունքը:
հարցնել - սպասում է արդյունքի: Թեմայում կարող եք նշել գործակալ, որին կգնա արդյունքը:
Այսպիսով, այս ամենն այսօր գործակալների հետ է:
Երազանքի թիմը
Վերջին բանը, որ խոստացել եմ գրել այս մասում, հրամաններն են։ Ինչպես նշվեց ավելի վաղ, faust-ի հրամանները սեղմման շուրջ են: Փաստորեն, faust-ը պարզապես կցում է մեր հատուկ հրամանն իր ինտերֆեյսին, երբ նշում է -A ստեղնը
Հայտարարված գործակալների ներսից հետո agents.py ավելացրեք գործառույթ դեկորատորի միջոցով app.commandկոչելով մեթոդը դերաբաշխում у հավաքել_արժեթղթեր:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Այսպիսով, եթե մենք կանչենք հրամանների ցանկը, մեր նոր հրամանը կլինի դրանում.
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Մենք կարող ենք օգտագործել այն բոլորի նման, ուստի եկեք վերագործարկենք faust worker-ը և սկսենք արժեթղթերի ամբողջական հավաքածու.
> faust -A horton.agents start-collect-securities
Ինչ է լինելու հաջորդը.
Հաջորդ մասում, օգտագործելով մնացած գործակալները որպես օրինակ, մենք կքննարկենք տարվա ընթացքում առևտրի փակման գներում ծայրահեղությունների որոնման մեխանիզմը և գործակալների բաց թողարկումը:
Այսքանն է այսօրվա համար: Շնորհակալություն կարդալու համար :)