Tugas latar belakang Faust, Bagian II: Agen dan Tim

Tugas latar belakang Faust, Bagian II: Agen dan Tim

daftar isi

  1. Bagian I: Pendahuluan

  2. Bagian II: Agen dan Tim

Apa yang kita lakukan di sini?

Jadi, bagian kedua. Seperti yang ditulis sebelumnya, di dalamnya kita akan melakukan hal berikut:

  1. Mari kita menulis klien kecil untuk alphavantage di aiohttp dengan permintaan untuk titik akhir yang kita butuhkan.

  2. Mari kita buat agen yang akan mengumpulkan data sekuritas dan informasi meta tentangnya.

Namun, inilah yang akan kami lakukan untuk proyek itu sendiri, dan dalam hal penelitian faust, kami akan mempelajari cara menulis agen yang memproses peristiwa aliran dari kafka, serta cara menulis perintah (klik pembungkus), dalam kasus kami - untuk pesan push manual ke topik yang dipantau agen.

Latihan

Klien AlphaVantage

Pertama, mari kita tulis klien aiohttp kecil untuk permintaan ke alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Sebenarnya semuanya jelas dari ini:

  1. AlphaVantage API dirancang dengan cukup sederhana dan indah, jadi saya memutuskan untuk membuat semua permintaan melalui metode ini construct_query dimana pada gilirannya ada panggilan http.

  2. Saya membawa semua bidang ke snake_case untuk kenyamanan.

  3. Nah, dekorasi logger.catch untuk keluaran traceback yang indah dan informatif.

PS Jangan lupa menambahkan token alphavantage secara lokal ke config.yml, atau mengekspor variabel lingkungan HORTON_SERVICE_APIKEY. Kami menerima token di sini.

kelas CRUD

Kami akan memiliki koleksi sekuritas untuk menyimpan informasi meta tentang sekuritas.

database/keamanan.py

Menurut pendapat saya, tidak perlu menjelaskan apa pun di sini, dan kelas dasarnya sendiri cukup sederhana.

dapatkan_aplikasi()

Mari tambahkan fungsi untuk membuat objek aplikasi app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Untuk saat ini kami akan memiliki pembuatan aplikasi yang paling sederhana, nanti kami akan mengembangkannya, namun agar tidak membuat Anda menunggu, berikut referensi ke kelas Aplikasi. Saya juga menyarankan Anda untuk melihat kelas pengaturan, karena kelas ini bertanggung jawab atas sebagian besar pengaturan.

Bagian utama

Agen untuk mengumpulkan dan memelihara daftar surat berharga

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Jadi, pertama-tama kita dapatkan objek aplikasi faust - ini cukup sederhana. Selanjutnya, kami mendeklarasikan topik untuk agen kami secara eksplisit... Di sini perlu disebutkan apa itu, apa parameter internalnya, dan bagaimana hal ini dapat diatur secara berbeda.

  1. Topik dalam kafka, jika kita ingin mengetahui definisi pastinya, ada baiknya kita membaca mati. dokumen, atau Anda dapat membaca abstrak di Habré dalam bahasa Rusia, semuanya juga tercermin dengan cukup akurat :)

  2. Parameter dalam, dijelaskan dengan cukup baik di dokumen faust, memungkinkan kita untuk mengonfigurasi topik langsung di kode, tentu saja, ini berarti parameter yang disediakan oleh pengembang faust, misalnya: retensi, kebijakan penyimpanan (secara default hapus, tetapi Anda dapat mengatur padat), jumlah partisi per topik (skoruntuk melakukan, misalnya, kurang dari signifikansi global aplikasi faust).

  3. Secara umum, agen dapat membuat topik terkelola dengan nilai global, namun saya ingin menyatakan semuanya secara eksplisit. Selain itu, beberapa parameter (misalnya, jumlah partisi atau kebijakan penyimpanan) topik dalam iklan agen tidak dapat dikonfigurasi.

    Berikut tampilannya tanpa menentukan topik secara manual:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nah, sekarang mari kita uraikan apa yang akan dilakukan agen kami :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Jadi, di awal agen, kami membuka sesi aiohttp untuk permintaan melalui klien kami. Jadi, saat memulai seorang pekerja, ketika agen kami diluncurkan, sebuah sesi akan segera dibuka - satu, selama pekerja tersebut berjalan (atau beberapa, jika Anda mengubah parameter konkurensi dari agen dengan unit default).

Selanjutnya kita ikuti arusnya (kita tempatkan pesannya di _, karena kami, di agen ini, tidak peduli dengan konten) pesan dari topik kami, jika pesan tersebut ada pada offset saat ini, jika tidak, siklus kami akan menunggu kedatangannya. Nah, di dalam loop kami, kami mencatat penerimaan pesan, mendapatkan daftar sekuritas aktif (get_securities hanya aktif secara default, lihat kode klien) dan menyimpannya ke database, memeriksa apakah ada keamanan dengan ticker yang sama dan exchange di database, kalau ada maka (kertasnya) langsung diupdate.

Ayo luncurkan kreasi kita!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Fitur PS komponen web Saya tidak akan mempertimbangkan Faust dalam artikel tersebut, jadi kami memasang bendera yang sesuai.

Dalam perintah peluncuran kami, kami memberi tahu faust di mana mencari objek aplikasi dan apa yang harus dilakukan dengannya (meluncurkan pekerja) dengan tingkat keluaran log info. Kami mendapatkan output berikut:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ini hidup!!!

Mari kita lihat kumpulan partisi. Seperti yang bisa kita lihat, sebuah topik dibuat dengan nama yang kita tentukan dalam kode, jumlah partisi default (8, diambil dari topik_partisi - parameter objek aplikasi), karena kami tidak menentukan nilai individual untuk topik kami (melalui partisi). Agen yang diluncurkan di pekerja ditugaskan ke semua 8 partisi, karena ini adalah satu-satunya, tetapi ini akan dibahas lebih rinci di bagian pengelompokan.

Nah, sekarang kita bisa pergi ke jendela terminal lain dan mengirim pesan kosong ke topik kita:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS menggunakan @ kami menunjukkan bahwa kami mengirim pesan ke topik bernama “collect_securities”.

Dalam hal ini, pesan masuk ke partisi 6 - Anda dapat memeriksanya dengan membuka kafdrop on localhost:9000

Pergi ke jendela terminal dengan pekerja kami, kami akan melihat pesan bahagia yang dikirim menggunakan logguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Kita juga dapat melihat mongo (menggunakan Robo3T atau Studio3T) dan melihat bahwa sekuritasnya ada di database:

Saya bukan miliarder, dan oleh karena itu kami puas dengan opsi tontonan pertama.

Tugas latar belakang Faust, Bagian II: Agen dan TimTugas latar belakang Faust, Bagian II: Agen dan Tim

Kebahagiaan dan kegembiraan - agen pertama sudah siap :)

Agen siap, panjang umur agen baru!

Iya bapak-bapak, kami baru membahas 1/3 dari jalur yang disiapkan artikel ini, namun jangan berkecil hati, karena sekarang akan lebih mudah.

Jadi sekarang kita memerlukan agen yang mengumpulkan informasi meta dan memasukkannya ke dalam dokumen koleksi:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Karena agen ini akan memproses informasi tentang keamanan tertentu, kita perlu menunjukkan ticker (simbol) keamanan ini dalam pesan. Untuk tujuan ini di faust ada Arsip — kelas yang mendeklarasikan skema pesan dalam topik agen.

Dalam hal ini, mari kita pergi ke catatan.py dan jelaskan seperti apa pesan untuk topik ini:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Seperti yang sudah Anda duga, faust menggunakan anotasi tipe python untuk mendeskripsikan skema pesan, itulah sebabnya versi minimum yang didukung oleh perpustakaan adalah 3.6.

Mari kembali ke agen, atur jenisnya dan tambahkan:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Seperti yang Anda lihat, kami meneruskan parameter baru dengan skema ke metode inisialisasi topik - value_type. Selanjutnya, semuanya mengikuti skema yang sama, jadi saya tidak melihat ada gunanya memikirkan hal lain.

Nah, sentuhan terakhir adalah menambahkan panggilan ke agen pengumpulan informasi meta ke collector_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Kami menggunakan skema yang diumumkan sebelumnya untuk pesan tersebut. Dalam hal ini, saya menggunakan metode .cast karena kita tidak perlu menunggu hasil dari agen, tetapi perlu disebutkan bahwa cara kirim pesan ke topik:

  1. cast - tidak memblokir karena tidak mengharapkan hasil. Anda tidak dapat mengirimkan hasilnya ke topik lain sebagai pesan.

  2. kirim - tidak memblokir karena tidak mengharapkan hasil. Anda dapat menentukan agen dalam topik yang akan dijadikan tujuan hasil.

  3. bertanya - menunggu hasilnya. Anda dapat menentukan agen dalam topik yang akan dijadikan tujuan hasil.

Jadi, itu saja tentang agen hari ini!

Tim impian

Hal terakhir yang saya janjikan untuk ditulis di bagian ini adalah perintah. Seperti disebutkan sebelumnya, perintah di faust adalah pembungkus klik. Faktanya, faust cukup melampirkan perintah khusus kami ke antarmukanya saat menentukan kunci -A

Setelah agen diumumkan masuk agen.py tambahkan fungsi dengan dekorator app.commandmemanggil metode tersebut cast у kumpulkan_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Jadi, jika kita memanggil daftar perintah, perintah baru kita akan ada di dalamnya:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Kita bisa menggunakannya seperti orang lain, jadi mari kita mulai ulang faustworker dan mulai mengumpulkan sekuritas secara lengkap:

> faust -A horton.agents start-collect-securities

Apa yang akan terjadi selanjutnya?

Pada bagian selanjutnya, dengan menggunakan agen yang tersisa sebagai contoh, kita akan melihat mekanisme sink untuk mencari harga penutupan perdagangan yang ekstrem untuk tahun ini dan peluncuran agen cron.

Itu saja untuk hari ini! Terima kasih sudah membaca :)

Kode untuk bagian ini

Tugas latar belakang Faust, Bagian II: Agen dan Tim

PS Di bagian terakhir saya ditanya tentang faust dan kafka konfluen (fitur apa yang dimiliki konfluen?). Tampaknya confluent lebih fungsional dalam banyak hal, tetapi faktanya faust tidak memiliki dukungan klien penuh untuk confluent - ini mengikuti dari deskripsi pembatasan klien di dokumen.

Sumber: www.habr.com

Tambah komentar