Faust, Bölüm II: Temsilciler ve Ekipler ile ilgili arka plan görevleri

Faust, Bölüm II: Temsilciler ve Ekipler ile ilgili arka plan görevleri

içindekiler

  1. Bölüm I: Giriş

  2. Bölüm II: Temsilciler ve Ekipler

Burada ne yapıyoruz?

Yani ikinci kısım. Daha önce yazıldığı gibi, içinde aşağıdakileri yapacağız:

  1. İhtiyacımız olan uç noktalara yönelik istekleri içeren, aiohttp üzerinde alphavantage için küçük bir istemci yazalım.

  2. Menkul kıymetlere ilişkin verileri ve bunlarla ilgili meta bilgileri toplayacak bir aracı oluşturalım.

Ancak projenin kendisi için yapacağımız şey budur ve faust araştırması açısından, bizim durumumuzda kafka'dan akış olaylarını işleyen aracıların nasıl yazılacağının yanı sıra komutların (tıklama sarmalayıcı) nasıl yazılacağını da öğreneceğiz - Aracının izlediği konuya manuel push mesajları için.

Eğitim

AlphaVantage İstemcisi

Öncelikle, alphavantage istekleri için küçük bir aiohttp istemcisi yazalım.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Aslında her şey bundan açıkça anlaşılıyor:

  1. AlphaVantage API oldukça basit ve güzel bir tasarıma sahip olduğundan tüm istekleri bu yöntemle yapmaya karar verdim. construct_query burada sırayla bir http çağrısı var.

  2. Bütün alanları buraya getiriyorum snake_case rahatlık için.

  3. Güzel ve bilgilendirici geri izleme çıktısı için logger.catch dekorasyonu.

PS Alphavantage belirtecini yerel olarak config.yml dosyasına eklemeyi veya ortam değişkenini dışa aktarmayı unutmayın. HORTON_SERVICE_APIKEY. Bir jeton alıyoruz burada.

CRUD sınıfı

Menkul kıymetlerle ilgili meta bilgileri depolamak için bir menkul kıymet koleksiyonumuz olacak.

veritabanı/güvenlik.py

Bana göre burada hiçbir şeyi açıklamaya gerek yok ve temel sınıfın kendisi de oldukça basit.

uygulamayı al()

Uygulama nesnesi oluşturmak için bir işlev ekleyelim uygulama.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Şimdilik en basit uygulama oluşturma işlemini gerçekleştireceğiz, biraz sonra genişleteceğiz ancak sizi bekletmemek için burada Referanslar Uygulama sınıfına. Ayrıca ayarların çoğundan sorumlu olduğu için ayarlar sınıfına da göz atmanızı tavsiye ederim.

Ana

Menkul kıymetlerin listesini toplama ve muhafaza etme aracısı

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Yani ilk önce faust uygulama nesnesini alıyoruz - bu oldukça basit. Daha sonra temsilcimiz için açıkça bir konu ilan ediyoruz... Burada bunun ne olduğunu, iç parametrenin ne olduğunu ve bunun nasıl farklı şekilde düzenlenebileceğini belirtmekte fayda var.

  1. Kafka'daki konular, tam tanımını bilmek istiyorsak okumak daha iyi olur kapalı. belgeveya okuyabilirsiniz Öz Rusça Habré'de, her şey de oldukça doğru bir şekilde yansıtılıyor :)

  2. Parametre dahiliFaust belgesinde oldukça iyi açıklanan , konuyu doğrudan kodda yapılandırmamıza olanak tanır, elbette bu, faust geliştiricileri tarafından sağlanan parametreler anlamına gelir, örneğin: saklama, saklama politikası (varsayılan olarak silme, ancak siz ayarlayabilirsiniz) kompakt), konu başına bölüm sayısı (puanlarıörneğin daha azını yapmak küresel önem uygulamalar başarısız).

  3. Genel olarak temsilci, global değerlere sahip yönetilen bir konu oluşturabilir, ancak ben her şeyi açıkça beyan etmeyi seviyorum. Ayrıca, aracı reklamındaki konunun bazı parametreleri (örneğin, bölüm sayısı veya saklama politikası) yapılandırılamaz.

    Konuyu manuel olarak tanımlamadan şöyle görünebilir:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

O halde şimdi acentemizin ne yapacağını anlatalım :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Yani, Agent'ın başında client'ımız üzerinden gelen istekler için aiohttp oturumu açıyoruz. Böylece, bir işçiyi başlatırken, aracımız başlatıldığında, derhal bir oturum açılacaktır - işçinin çalıştığı süre boyunca bir oturum (veya parametreyi değiştirirseniz birkaç tane) eşzamanlılık varsayılan birimi olan bir temsilciden).

Daha sonra akışı takip ediyoruz (mesajı _, biz bu temsilcide konumuzdaki mesajların içeriğini umursamıyoruz, eğer mevcut ofsette mevcutlarsa, aksi takdirde döngümüz onların gelişini bekleyecektir. Döngümüzün içinde, mesajın alındığını günlüğe kaydediyoruz, aktif (get_securities yalnızca varsayılan olarak aktiftir, müşteri koduna bakın) menkul kıymetlerin bir listesini alıyoruz ve bunu veritabanına kaydediyoruz, aynı onay işaretine sahip bir menkul kıymet olup olmadığını kontrol ediyoruz ve veritabanında değişim varsa, o zaman (kağıt) basitçe güncellenecektir.

Hadi yaratımımızı başlatalım!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Özellikleri web bileşeni Makalelerde faust'u dikkate almayacağım, bu yüzden uygun bayrağı belirledik.

Başlatma komutumuzda Faust'a info log çıktı seviyesi ile uygulama nesnesini nerede arayacağını ve onunla ne yapacağını (worker başlatma) anlattık. Aşağıdaki çıktıyı alıyoruz:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Yaşıyor!!!

Bölüm kümesine bakalım. Görüldüğü gibi kodda belirlediğimiz isim ile varsayılan partition sayısı (8, from'dan alınmıştır) ile bir konu oluşturuldu. konu_bölümleri - uygulama nesnesi parametresi), konumuz için bireysel bir değer belirtmediğimiz için (bölümler aracılığıyla). İşçide başlatılan aracıya 8 bölümün tamamı atanır, çünkü o tek bölümdür, ancak bu, kümeleme ile ilgili bölümde daha ayrıntılı olarak tartışılacaktır.

Artık başka bir terminal penceresine gidip konumuza boş bir mesaj gönderebiliriz:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS kullanarak @ “collect_securities” isimli bir konuya mesaj gönderdiğimizi gösteriyoruz.

Bu durumda mesaj bölüm 6'ya gitti; bunu kafdrop'a giderek kontrol edebilirsiniz. localhost:9000

Çalışanımızla birlikte terminal penceresine gittiğimizde loguru kullanılarak gönderilen mutlu bir mesajı göreceğiz:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Ayrıca mongo'ya bakabiliriz (Robo3T veya Studio3T kullanarak) ve menkul kıymetlerin veritabanında olduğunu görebiliriz:

Ben bir milyarder değilim ve bu nedenle ilk görüntüleme seçeneğinden memnunuz.

Faust, Bölüm II: Temsilciler ve Ekipler ile ilgili arka plan görevleriFaust, Bölüm II: Temsilciler ve Ekipler ile ilgili arka plan görevleri

Mutluluk ve neşe - ilk temsilci hazır :)

Ajan hazır, yaşasın yeni ajan!

Evet beyler, bu makalenin hazırladığı yolun yalnızca 1/3'ünü kat ettik ama cesaretiniz kırılmasın çünkü artık daha kolay olacak.

Artık meta bilgileri toplayan ve onu bir koleksiyon belgesine koyan bir aracıya ihtiyacımız var:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Bu aracı belirli bir menkul kıymet hakkındaki bilgileri işleyeceğinden, bu menkul kıymetin işaretini (sembolünü) mesajda belirtmemiz gerekir. Faust'ta bu amaçla Kayıtlar — aracı konusundaki mesaj şemasını bildiren sınıflar.

Bu durumda şuraya gidelim: kayıtlar.py ve bu konuya ilişkin mesajın nasıl görünmesi gerektiğini açıklayın:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Tahmin edebileceğiniz gibi faust, mesaj şemasını tanımlamak için python tipi açıklamayı kullanır; bu nedenle kitaplık tarafından desteklenen minimum sürüm şu şekildedir: 3.6.

Aracıya dönelim, türleri ayarlayalım ve ekleyelim:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Gördüğünüz gibi, konu başlatma yöntemine - değer_tipi - şemalı yeni bir parametre aktarıyoruz. Üstelik her şey aynı şemayı takip ediyor, bu yüzden başka bir şey üzerinde durmanın bir anlamı yok.

Son dokunuş, meta bilgi toplama aracısına Collect_securitites'e bir çağrı eklemektir:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Mesaj için daha önce duyurulan şemayı kullanıyoruz. Bu durumda aracıdan sonuç beklememize gerek kalmadığı için .cast yöntemini kullandım ancak şunu da belirtmekte fayda var. yolları konuya mesaj gönderin:

  1. cast - bir sonuç beklemediği için engellemez. Sonucu başka bir konuya mesaj olarak gönderemezsiniz.

  2. gönder - bir sonuç beklemediğinden engellemez. Sonucun gideceği konuda bir aracı belirtebilirsiniz.

  3. sor - bir sonuç bekler. Sonucun gideceği konuda bir aracı belirtebilirsiniz.

Bugünlük acentelerle ilgili bu kadar!

Rüya takımı

Bu bölümde yazmaya söz verdiğim son şey komutlar. Daha önce de belirtildiği gibi, faust'taki komutlar tıklamanın etrafındaki bir sarmalayıcıdır. Aslında faust, -A anahtarını belirtirken özel komutumuzu arayüzüne ekler.

Açıklanan acentelerin ardından ajanlar.py dekoratörlü bir işlev ekleyin uygulama.komutyöntemi çağırmak döküm у koleksiyon_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dolayısıyla, komutların listesini çağırırsak, yeni komutumuz onun içinde olacaktır:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Bunu herkes gibi kullanabiliriz, bu yüzden faust işçisini yeniden başlatalım ve tam teşekküllü bir menkul kıymet koleksiyonuna başlayalım:

> faust -A horton.agents start-collect-securities

Sonra ne olacak?

Bir sonraki bölümde, kalan acenteleri örnek olarak kullanarak, yılın kapanış fiyatlarındaki uç noktaları aramak ve acentelerin cron lansmanını yapmak için lavabo mekanizmasını ele alacağız.

Hepsi bugün için! Okuduğunuz için teşekkürler :)

Bu bölümün kodu

Faust, Bölüm II: Temsilciler ve Ekipler ile ilgili arka plan görevleri

PS Son bölümde bana faust ve confluent kafka hakkında sorular soruldu (confluent'ın özellikleri nelerdir?). Görünüşe göre confluent birçok yönden daha işlevsel, ancak gerçek şu ki faust confluent için tam istemci desteğine sahip değil. belgedeki istemci kısıtlamalarının açıklamaları.

Kaynak: habr.com

Yorum ekle