Tugas latar belakang pada Faust, Bahagian II: Ejen dan Pasukan

Tugas latar belakang pada Faust, Bahagian II: Ejen dan Pasukan

jadual kandungan

  1. Bahagian I: Pengenalan

  2. Bahagian II: Ejen dan Pasukan

Apa yang kita buat di sini?

Jadi, jadi, bahagian kedua. Seperti yang ditulis sebelum ini, di dalamnya kita akan melakukan perkara berikut:

  1. Mari kita tulis klien kecil untuk alphavantage pada aiohttp dengan permintaan untuk titik akhir yang kita perlukan.

  2. Mari cipta ejen yang akan mengumpul data mengenai sekuriti dan maklumat meta mengenainya.

Tetapi, inilah yang akan kami lakukan untuk projek itu sendiri, dan dari segi penyelidikan yang pantas, kami akan belajar cara menulis ejen yang memproses acara aliran dari kafka, serta cara menulis arahan (pembungkus klik), dalam kes kami - untuk mesej tolak manual ke topik yang sedang dipantau oleh ejen.

Latihan

Pelanggan AlphaVantage

Mula-mula, mari tulis klien aiohttp kecil untuk permintaan alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Sebenarnya, semuanya jelas daripadanya:

  1. API AlphaVantage direka dengan agak ringkas dan cantik, jadi saya memutuskan untuk membuat semua permintaan melalui kaedah tersebut construct_query di mana pula terdapat panggilan http.

  2. Saya bawa semua bidang ke snake_case untuk kemudahan.

  3. Nah, hiasan logger.catch untuk keluaran jejak balik yang cantik dan bermaklumat.

PS Jangan lupa untuk menambah token alphavantage secara setempat ke config.yml, atau mengeksport pembolehubah persekitaran HORTON_SERVICE_APIKEY. Kami menerima token di sini.

kelas CRUD

Kami akan mempunyai koleksi sekuriti untuk menyimpan maklumat meta tentang sekuriti.

pangkalan data/security.py

Pada pendapat saya, tidak perlu menjelaskan apa-apa di sini, dan kelas asas itu sendiri agak mudah.

get_app()

Mari tambah fungsi untuk mencipta objek aplikasi dalam app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Buat masa ini kami akan mempunyai penciptaan aplikasi yang paling mudah, sedikit kemudian kami akan mengembangkannya, bagaimanapun, untuk tidak membuat anda menunggu, di sini rujukan kepada App-class. Saya juga menasihati anda untuk melihat kelas tetapan, kerana ia bertanggungjawab untuk kebanyakan tetapan.

Bahagian utama

Ejen untuk mengumpul dan mengekalkan senarai sekuriti

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Jadi, mula-mula kita dapatkan objek aplikasi faust - ia agak mudah. Seterusnya, kami secara eksplisit mengisytiharkan topik untuk ejen kami... Di sini adalah wajar untuk menyebut apa itu, apakah parameter dalaman dan cara ini boleh diatur secara berbeza.

  1. Topik dalam kafka, jika kita ingin mengetahui definisi yang tepat, lebih baik membaca dimatikan. dokumen, atau anda boleh membaca ringkasan pada Habré dalam bahasa Rusia, di mana segala-galanya juga dicerminkan dengan tepat :)

  2. Parameter dalaman, diterangkan dengan baik dalam dokumen faust, membolehkan kami mengkonfigurasi topik secara langsung dalam kod, sudah tentu, ini bermakna parameter yang disediakan oleh pembangun faust, contohnya: pengekalan, dasar pengekalan (secara lalai padam, tetapi anda boleh menetapkan padat), bilangan partition setiap topik (skoruntuk melakukan, sebagai contoh, kurang daripada kepentingan global aplikasi faust).

  3. Secara umum, ejen boleh mencipta topik terurus dengan nilai global, namun, saya suka mengisytiharkan semuanya secara eksplisit. Selain itu, beberapa parameter (contohnya, bilangan partition atau dasar pengekalan) topik dalam iklan ejen tidak boleh dikonfigurasikan.

    Inilah rupanya tanpa menentukan topik secara manual:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nah, sekarang mari kita terangkan apa yang akan dilakukan oleh ejen kami :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Jadi, pada permulaan ejen, kami membuka sesi aiohttp untuk permintaan melalui pelanggan kami. Oleh itu, apabila memulakan pekerja, apabila ejen kami dilancarkan, sesi akan segera dibuka - satu, untuk sepanjang masa pekerja sedang berjalan (atau beberapa, jika anda menukar parameter serentak daripada ejen dengan unit lalai).

Seterusnya, kami mengikuti aliran (kami meletakkan mesej dalam _, memandangkan kami, dalam ejen ini, tidak mengambil berat tentang kandungan) mesej daripada topik kami, jika ia wujud pada offset semasa, jika tidak, kitaran kami akan menunggu ketibaan mereka. Nah, di dalam gelung kami, kami log penerimaan mesej, dapatkan senarai sekuriti aktif (get_securities kembali aktif secara lalai, lihat kod pelanggan) dan simpan ke pangkalan data, menyemak sama ada terdapat keselamatan dengan ticker yang sama dan pertukaran dalam pangkalan data , jika ada, maka ia (kertas itu) hanya akan dikemas kini.

Mari lancarkan ciptaan kami!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Ciri-ciri PS komponen web Saya tidak akan menganggap faust dalam artikel, jadi kami menetapkan bendera yang sesuai.

Dalam arahan pelancaran kami, kami memberitahu faust tempat untuk mencari objek aplikasi dan perkara yang perlu dilakukan dengannya (melancarkan pekerja) dengan tahap output log maklumat. Kami mendapat output berikut:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ia hidup!!!

Mari lihat set partition. Seperti yang kita dapat lihat, topik telah dibuat dengan nama yang kami tetapkan dalam kod, bilangan lalai partition (8, diambil daripada topic_partitions - parameter objek aplikasi), kerana kami tidak menentukan nilai individu untuk topik kami (melalui partition). Ejen yang dilancarkan dalam pekerja diberikan semua 8 partition, kerana ia adalah satu-satunya, tetapi ini akan dibincangkan dengan lebih terperinci dalam bahagian tentang pengelompokan.

Nah, sekarang kita boleh pergi ke tetingkap terminal lain dan menghantar mesej kosong ke topik kita:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS menggunakan @ kami menunjukkan bahawa kami menghantar mesej kepada topik bernama "collect_securities".

Dalam kes ini, mesej pergi ke partition 6 - anda boleh menyemak ini dengan pergi ke kafdrop on localhost:9000

Pergi ke tetingkap terminal bersama pekerja kami, kami akan melihat mesej gembira yang dihantar menggunakan loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Kami juga boleh melihat mongo (menggunakan Robo3T atau Studio3T) dan melihat bahawa sekuriti berada dalam pangkalan data:

Saya bukan jutawan, dan oleh itu kami berpuas hati dengan pilihan tontonan pertama.

Tugas latar belakang pada Faust, Bahagian II: Ejen dan PasukanTugas latar belakang pada Faust, Bahagian II: Ejen dan Pasukan

Kegembiraan dan kegembiraan - ejen pertama sudah bersedia :)

Ejen sedia, hidup ejen baru!

Ya, tuan-tuan, kami hanya menutup 1/3 daripada laluan yang disediakan oleh artikel ini, tetapi jangan berkecil hati, kerana sekarang ia akan menjadi lebih mudah.

Jadi sekarang kami memerlukan ejen yang mengumpul maklumat meta dan memasukkannya ke dalam dokumen pengumpulan:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Memandangkan ejen ini akan memproses maklumat tentang keselamatan tertentu, kami perlu menunjukkan ticker (simbol) keselamatan ini dalam mesej. Untuk tujuan ini dalam faus ada Rekod — kelas yang mengisytiharkan skema mesej dalam topik ejen.

Dalam kes ini, mari pergi ke records.py dan terangkan rupa mesej untuk topik ini:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Seperti yang anda duga, faust menggunakan anotasi jenis python untuk menerangkan skema mesej, itulah sebabnya versi minimum yang disokong oleh perpustakaan ialah 3.6.

Mari kembali kepada ejen, tetapkan jenis dan tambahnya:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Seperti yang anda lihat, kami menghantar parameter baharu dengan skema kepada kaedah permulaan topik - value_type. Selanjutnya, semuanya mengikut skema yang sama, jadi saya tidak nampak apa-apa gunanya memikirkan perkara lain.

Nah, sentuhan terakhir ialah menambah panggilan kepada ejen pengumpulan maklumat meta untuk collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Kami menggunakan skema yang diumumkan sebelum ini untuk mesej. Dalam kes ini, saya menggunakan kaedah .cast kerana kita tidak perlu menunggu keputusan daripada ejen, tetapi perlu dinyatakan bahawa cara-cara hantar mesej kepada topik:

  1. cast - tidak menyekat kerana tidak mengharapkan hasil. Anda tidak boleh menghantar hasil ke topik lain sebagai mesej.

  2. hantar - tidak menyekat kerana tidak mengharapkan hasil. Anda boleh menentukan ejen dalam topik yang hasilnya akan dihantar.

  3. bertanya - menunggu keputusan. Anda boleh menentukan ejen dalam topik yang hasilnya akan dihantar.

Jadi, itu sahaja dengan ejen untuk hari ini!

Pasukan impian

Perkara terakhir yang saya janjikan untuk menulis dalam bahagian ini ialah arahan. Seperti yang dinyatakan sebelum ini, arahan dalam faust adalah pembungkus di sekitar klik. Sebenarnya, faust hanya melampirkan arahan tersuai kami pada antara mukanya apabila menentukan kekunci -A

Selepas ejen diumumkan masuk agents.py menambah fungsi dengan penghias app.commandmemanggil kaedah membuang у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Oleh itu, jika kami memanggil senarai arahan, arahan baharu kami akan berada di dalamnya:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Kita boleh menggunakannya seperti orang lain, jadi mari kita mulakan semula pekerja faust dan mulakan koleksi sekuriti yang lengkap:

> faust -A horton.agents start-collect-securities

Apa yang akan berlaku seterusnya?

Dalam bahagian seterusnya, menggunakan baki ejen sebagai contoh, kami akan mempertimbangkan mekanisme tenggelam untuk mencari keterlaluan dalam harga penutupan dagangan untuk tahun ini dan pelancaran cron ejen.

Itu sahaja untuk hari ini! Terima kasih untuk membaca :)

Kod untuk bahagian ini

Tugas latar belakang pada Faust, Bahagian II: Ejen dan Pasukan

PS Di bawah bahagian terakhir saya ditanya tentang kafka faust dan confluent (apakah ciri yang ada pada konfluen?). Nampaknya confluent lebih berfungsi dalam banyak cara, tetapi hakikatnya faust tidak mempunyai sokongan pelanggan penuh untuk confluent - ini berikutan daripada penerangan tentang sekatan pelanggan dalam dokumen.

Sumber: www.habr.com

Tambah komen