Yani ikinci kısım. Daha önce yazıldığı gibi, içinde aşağıdakileri yapacağız:
İhtiyacımız olan uç noktalara yönelik istekleri içeren, aiohttp üzerinde alphavantage için küçük bir istemci yazalım.
Menkul kıymetlere ilişkin verileri ve bunlarla ilgili meta bilgileri toplayacak bir aracı oluşturalım.
Ancak projenin kendisi için yapacağımız şey budur ve faust araştırması açısından, bizim durumumuzda kafka'dan akış olaylarını işleyen aracıların nasıl yazılacağının yanı sıra komutların (tıklama sarmalayıcı) nasıl yazılacağını da öğreneceğiz - Aracının izlediği konuya manuel push mesajları için.
Eğitim
AlphaVantage İstemcisi
Öncelikle, alphavantage istekleri için küçük bir aiohttp istemcisi yazalım.
AlphaVantage API oldukça basit ve güzel bir tasarıma sahip olduğundan tüm istekleri bu yöntemle yapmaya karar verdim. construct_query burada sırayla bir http çağrısı var.
Bütün alanları buraya getiriyorum snake_case rahatlık için.
Güzel ve bilgilendirici geri izleme çıktısı için logger.catch dekorasyonu.
PS Alphavantage belirtecini yerel olarak config.yml dosyasına eklemeyi veya ortam değişkenini dışa aktarmayı unutmayın. HORTON_SERVICE_APIKEY. Bir jeton alıyoruz burada.
CRUD sınıfı
Menkul kıymetlerle ilgili meta bilgileri depolamak için bir menkul kıymet koleksiyonumuz olacak.
Şimdilik en basit uygulama oluşturma işlemini gerçekleştireceğiz, biraz sonra genişleteceğiz ancak sizi bekletmemek için burada Referanslar Uygulama sınıfına. Ayrıca ayarların çoğundan sorumlu olduğu için ayarlar sınıfına da göz atmanızı tavsiye ederim.
Ana
Menkul kıymetlerin listesini toplama ve muhafaza etme aracısı
Yani ilk önce faust uygulama nesnesini alıyoruz - bu oldukça basit. Daha sonra temsilcimiz için açıkça bir konu ilan ediyoruz... Burada bunun ne olduğunu, iç parametrenin ne olduğunu ve bunun nasıl farklı şekilde düzenlenebileceğini belirtmekte fayda var.
Kafka'daki konular, tam tanımını bilmek istiyorsak okumak daha iyi olur kapalı. belgeveya okuyabilirsiniz Öz Rusça Habré'de, her şey de oldukça doğru bir şekilde yansıtılıyor :)
Parametre dahiliFaust belgesinde oldukça iyi açıklanan , konuyu doğrudan kodda yapılandırmamıza olanak tanır, elbette bu, faust geliştiricileri tarafından sağlanan parametreler anlamına gelir, örneğin: saklama, saklama politikası (varsayılan olarak silme, ancak siz ayarlayabilirsiniz) kompakt), konu başına bölüm sayısı (puanlarıörneğin daha azını yapmak küresel önem uygulamalar başarısız).
Genel olarak temsilci, global değerlere sahip yönetilen bir konu oluşturabilir, ancak ben her şeyi açıkça beyan etmeyi seviyorum. Ayrıca, aracı reklamındaki konunun bazı parametreleri (örneğin, bölüm sayısı veya saklama politikası) yapılandırılamaz.
Konuyu manuel olarak tanımlamadan şöyle görünebilir:
O halde şimdi acentemizin ne yapacağını anlatalım :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Yani, Agent'ın başında client'ımız üzerinden gelen istekler için aiohttp oturumu açıyoruz. Böylece, bir işçiyi başlatırken, aracımız başlatıldığında, derhal bir oturum açılacaktır - işçinin çalıştığı süre boyunca bir oturum (veya parametreyi değiştirirseniz birkaç tane) eşzamanlılık varsayılan birimi olan bir temsilciden).
Daha sonra akışı takip ediyoruz (mesajı _, biz bu temsilcide konumuzdaki mesajların içeriğini umursamıyoruz, eğer mevcut ofsette mevcutlarsa, aksi takdirde döngümüz onların gelişini bekleyecektir. Döngümüzün içinde, mesajın alındığını günlüğe kaydediyoruz, aktif (get_securities yalnızca varsayılan olarak aktiftir, müşteri koduna bakın) menkul kıymetlerin bir listesini alıyoruz ve bunu veritabanına kaydediyoruz, aynı onay işaretine sahip bir menkul kıymet olup olmadığını kontrol ediyoruz ve veritabanında değişim varsa, o zaman (kağıt) basitçe güncellenecektir.
Hadi yaratımımızı başlatalım!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
PS Özellikleri web bileşeni Makalelerde faust'u dikkate almayacağım, bu yüzden uygun bayrağı belirledik.
Başlatma komutumuzda Faust'a info log çıktı seviyesi ile uygulama nesnesini nerede arayacağını ve onunla ne yapacağını (worker başlatma) anlattık. Aşağıdaki çıktıyı alıyoruz:
Bölüm kümesine bakalım. Görüldüğü gibi kodda belirlediğimiz isim ile varsayılan partition sayısı (8, from'dan alınmıştır) ile bir konu oluşturuldu. konu_bölümleri - uygulama nesnesi parametresi), konumuz için bireysel bir değer belirtmediğimiz için (bölümler aracılığıyla). İşçide başlatılan aracıya 8 bölümün tamamı atanır, çünkü o tek bölümdür, ancak bu, kümeleme ile ilgili bölümde daha ayrıntılı olarak tartışılacaktır.
Artık başka bir terminal penceresine gidip konumuza boş bir mesaj gönderebiliriz:
PS kullanarak @ “collect_securities” isimli bir konuya mesaj gönderdiğimizi gösteriyoruz.
Bu durumda mesaj bölüm 6'ya gitti; bunu kafdrop'a giderek kontrol edebilirsiniz. localhost:9000
Çalışanımızla birlikte terminal penceresine gittiğimizde loguru kullanılarak gönderilen mutlu bir mesajı göreceğiz:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Ayrıca mongo'ya bakabiliriz (Robo3T veya Studio3T kullanarak) ve menkul kıymetlerin veritabanında olduğunu görebiliriz:
Ben bir milyarder değilim ve bu nedenle ilk görüntüleme seçeneğinden memnunuz.
Mutluluk ve neşe - ilk temsilci hazır :)
Ajan hazır, yaşasın yeni ajan!
Evet beyler, bu makalenin hazırladığı yolun yalnızca 1/3'ünü kat ettik ama cesaretiniz kırılmasın çünkü artık daha kolay olacak.
Artık meta bilgileri toplayan ve onu bir koleksiyon belgesine koyan bir aracıya ihtiyacımız var:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Bu aracı belirli bir menkul kıymet hakkındaki bilgileri işleyeceğinden, bu menkul kıymetin işaretini (sembolünü) mesajda belirtmemiz gerekir. Faust'ta bu amaçla Kayıtlar — aracı konusundaki mesaj şemasını bildiren sınıflar.
Bu durumda şuraya gidelim: kayıtlar.pyve bu konuya ilişkin mesajın nasıl görünmesi gerektiğini açıklayın:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Tahmin edebileceğiniz gibi faust, mesaj şemasını tanımlamak için python tipi açıklamayı kullanır; bu nedenle kitaplık tarafından desteklenen minimum sürüm şu şekildedir: 3.6.
Aracıya dönelim, türleri ayarlayalım ve ekleyelim:
Gördüğünüz gibi, konu başlatma yöntemine - değer_tipi - şemalı yeni bir parametre aktarıyoruz. Üstelik her şey aynı şemayı takip ediyor, bu yüzden başka bir şey üzerinde durmanın bir anlamı yok.
Son dokunuş, meta bilgi toplama aracısına Collect_securitites'e bir çağrı eklemektir:
Mesaj için daha önce duyurulan şemayı kullanıyoruz. Bu durumda aracıdan sonuç beklememize gerek kalmadığı için .cast yöntemini kullandım ancak şunu da belirtmekte fayda var. yolları konuya mesaj gönderin:
cast - bir sonuç beklemediği için engellemez. Sonucu başka bir konuya mesaj olarak gönderemezsiniz.
gönder - bir sonuç beklemediğinden engellemez. Sonucun gideceği konuda bir aracı belirtebilirsiniz.
sor - bir sonuç bekler. Sonucun gideceği konuda bir aracı belirtebilirsiniz.
Bugünlük acentelerle ilgili bu kadar!
Rüya takımı
Bu bölümde yazmaya söz verdiğim son şey komutlar. Daha önce de belirtildiği gibi, faust'taki komutlar tıklamanın etrafındaki bir sarmalayıcıdır. Aslında faust, -A anahtarını belirtirken özel komutumuzu arayüzüne ekler.
Açıklanan acentelerin ardından ajanlar.py dekoratörlü bir işlev ekleyin uygulama.komutyöntemi çağırmak döküm у koleksiyon_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Dolayısıyla, komutların listesini çağırırsak, yeni komutumuz onun içinde olacaktır:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Bunu herkes gibi kullanabiliriz, bu yüzden faust işçisini yeniden başlatalım ve tam teşekküllü bir menkul kıymet koleksiyonuna başlayalım:
> faust -A horton.agents start-collect-securities
Sonra ne olacak?
Bir sonraki bölümde, kalan acenteleri örnek olarak kullanarak, yılın kapanış fiyatlarındaki uç noktaları aramak ve acentelerin cron lansmanını yapmak için lavabo mekanizmasını ele alacağız.