Tugas latar mburi ing Faust, Part II: Agen lan Tim

Tugas latar mburi ing Faust, Part II: Agen lan Tim

Daftar Isi

  1. Bagian I: Pambuka

  2. Part II: Agen lan Tim

Apa sing kita tindakake ing kene?

Dadi, bagean kapindho. Kaya sing wis ditulis sadurunge, kita bakal nindakake ing ngisor iki:

  1. Ayo nulis klien cilik kanggo alphavantage ing aiohttp kanthi panjaluk kanggo titik pungkasan sing dibutuhake.

  2. Ayo nggawe agen sing bakal ngumpulake data babagan sekuritas lan informasi meta.

Nanging, iki sing bakal ditindakake kanggo proyek kasebut dhewe, lan babagan riset faust, kita bakal sinau carane nulis agen sing ngolah acara stream saka kafka, uga carane nulis printah (klik wrapper), ing kasus kita - kanggo pesen push manual kanggo topik sing agen ngawasi.

Latihan

Klien AlphaVantage

Pisanan, ayo nulis klien aiohttp cilik kanggo panjaluk alphavantage.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Bener, kabeh wis jelas saka iku:

  1. API AlphaVantage cukup prasaja lan dirancang kanthi apik, mula aku mutusake kanggo nggawe kabeh panjaluk liwat metode kasebut construct_query ngendi ing siji ana telpon http.

  2. Aku nggawa kabeh lapangan kanggo snake_case kanggo comfort.

  3. Inggih, logger.catch dekorasi kanggo output traceback ayu lan informatif.

P.S. Aja lali kanggo nambah token alphavantage lokal kanggo config.yml, utawa ngekspor variabel lingkungan HORTON_SERVICE_APIKEY. Kita nampa token kene.

kelas CRUD

Kita bakal duwe koleksi sekuritas kanggo nyimpen informasi meta babagan sekuritas.

database/security.py

Ing mratelakake panemume, ana ora perlu kanggo nerangake apa kene, lan kelas dhasar dhewe cukup prasaja.

njaluk_app()

Ayo nambah fungsi kanggo nggawe obyek aplikasi ing app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Saiki kita bakal nggawe aplikasi sing paling gampang, mengko kita bakal nggedhekake, nanging, supaya sampeyan ora ngenteni, kene referensi menyang App-kelas. Aku menehi saran uga dipikir ing kelas setelan, awit iku tanggung jawab kanggo paling setelan.

Bagéyan utama

Agen kanggo ngumpulake lan njaga dhaptar sekuritase

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Dadi, pisanan kita entuk obyek aplikasi faust - cukup prasaja. Sabanjure, kita kanthi tegas ngumumake topik kanggo agen kita ... Kene iku worth sebutno apa iku, apa parameter internal lan carane iki bisa diatur beda.

  1. Topik ing kafka, yen kita pengin ngerti definisi sing tepat, luwih becik maca mati. dokumen, utawa sampeyan bisa maca kompendium ing Habré ing basa Rusia, ing ngendi kabeh uga dibayangke kanthi akurat :)

  2. Parameter internal, diterangake cukup apik ing doc faust, ngidini kita ngatur topik langsung ing kode, mesthi, iki tegese paramèter sing diwenehake dening pangembang faust, contone: retensi, kabijakan penylametan (kanthi standar mbusak, nanging sampeyan bisa nyetel kompak), jumlah partisi saben topik (nilaikanggo nindakake, contone, kurang saka pinunjul global aplikasi faust).

  3. Umumé, agen bisa nggawe topik sing dikelola kanthi nilai global, nanging aku seneng ngumumake kabeh kanthi tegas. Kajaba iku, sawetara paramèter (contone, jumlah partisi utawa kabijakan penylametan) topik ing iklan agen ora bisa dikonfigurasi.

    Mangkene apa sing bisa katon tanpa nemtokake topik kanthi manual:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nah, saiki ayo nerangake apa sing bakal ditindakake agen kita :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Dadi, ing wiwitan agen, kita mbukak sesi aiohttp kanggo panjalukan liwat klien kita. Mangkono, nalika miwiti buruh, nalika agen kita diluncurake, sesi bakal langsung dibukak - siji, kanggo kabeh wektu buruh mlaku (utawa sawetara, yen sampeyan ngganti parameter kasepakatan saka agen kanthi unit standar).

Sabanjure, kita tindakake stream (kita sijine pesen ing _, amarga kita, ing agen iki, ora peduli karo isi) pesen saka topik kita, yen ana ing offset saiki, yen siklus kita bakal ngenteni tekane. Inggih, ing daur ulang kita, kita log panrimo pesen, njaluk dhaptar aktif (get_securities bali mung aktif kanthi standar, ndeleng kode klien) securities lan nyimpen menyang database, mriksa yen ana keamanan karo ticker padha lan exchange ing database , yen ana, banjur (kertas) mung bakal dianyari.

Ayo miwiti kreasi kita!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

P.S. Kemungkinan komponen web Aku ora bakal nimbang faust ing artikel, supaya kita nyetel gendéra cocok.

Ing printah Bukak kita, kita marang faust ngendi kanggo nggoleki obyek aplikasi lan apa apa karo iku (bukak buruh) karo tingkat output log info. Kita entuk output ing ngisor iki:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Urip iku!!!

Ayo katon ing set partisi. Kaya sing kita deleng, topik digawe kanthi jeneng sing ditunjuk ing kode kasebut, nomer partisi standar (8, dijupuk saka topik_partisi - parameter obyek aplikasi), amarga kita ora nemtokake nilai individu kanggo topik kita (liwat partisi). Agen dibukak ing buruh diutus kabeh 8 partisi, awit iku mung siji, nanging iki bakal rembugan ing liyane rinci ing bagean babagan clustering.

Saiki, kita bisa pindhah menyang jendhela terminal liyane lan ngirim pesen kosong menyang topik kita:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

P.S. kanthi nggunakake @ kita nuduhake yen kita ngirim pesen menyang topik sing jenenge "collect_securities".

Ing kasus iki, pesen menyang partisi 6 - sampeyan bisa mriksa iki kanthi pindhah menyang kafdrop ing localhost:9000

Menyang jendhela terminal karo buruh kita, kita bakal weruh pesen seneng dikirim nggunakake loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Kita uga bisa ndeleng mongo (nggunakake Robo3T utawa Studio3T) lan ndeleng manawa sekuritas kasebut ana ing basis data:

Aku dudu miliarder, mula kita seneng karo pilihan tampilan pisanan.

Tugas latar mburi ing Faust, Part II: Agen lan TimTugas latar mburi ing Faust, Part II: Agen lan Tim

Kabungahan lan kabungahan - agen pisanan wis siyap :)

Agen siap, langgeng agen anyar!

Ya, Bapak-bapak, mung 1/3 dalan sing wis disiapake dening artikel iki, nanging aja kesusu, amarga saiki bakal luwih gampang.

Dadi saiki kita butuh agen sing ngumpulake informasi meta lan sijine menyang dokumen koleksi:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Amarga agen iki bakal ngolah informasi babagan keamanan tartamtu, kita kudu nuduhake ticker (simbol) keamanan iki ing pesen kasebut. Kanggo maksud iki ing faust ana Records - kelas sing ngumumake skema pesen ing topik agen.

Ing kasus iki, ayo pindhah menyang cathetan.py lan njlèntrèhaké kaya apa pesen kanggo topik iki:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Kaya sing wis sampeyan duga, faust nggunakake anotasi jinis python kanggo njlèntrèhaké skema pesen, mulane versi minimal sing didhukung perpustakaan yaiku 3.6.

Ayo bali menyang agen, atur jinis lan tambahake:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Nalika sampeyan bisa ndeleng, kita pass parameter anyar karo skema kanggo topik cara initialization - value_type. Salajengipun, kabeh ngetutake skema sing padha, mula aku ora weruh apa-apa kanggo manggon ing bab liya.

Ya, sentuhan pungkasan yaiku nambah telpon menyang agen pengumpulan informasi meta kanggo collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Kita nggunakake skema sing diumumake sadurunge kanggo pesen kasebut. Ing kasus iki, aku nggunakake cara .cast amarga kita ora perlu ngenteni asil saka agen, nanging iku worth sebutno sing cara ngirim pesen menyang topik:

  1. cast - ora mblokir amarga ora nyana asil. Sampeyan ora bisa ngirim asil menyang topik liyane minangka pesen.

  2. ngirim - ora mblokir amarga ora nyana asil. Sampeyan bisa nemtokake agen ing topik sing bakal dituju.

  3. takon - ngenteni asil. Sampeyan bisa nemtokake agen ing topik sing bakal dituju.

Dadi, iku kabeh karo agen kanggo dina iki!

Tim ngimpi

Ing bab pungkasan aku janji kanggo nulis ing bagean iki printah. Kaya sing wis kasebut sadurunge, printah ing faust minangka bungkus ing klik. Nyatane, faust mung nempelake perintah khusus menyang antarmuka nalika nemtokake tombol -A

Sawise agen announced ing agen.py nambah fungsi karo decorator a app.commandnelpon cara matak у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Dadi, yen kita nelpon dhaptar prentah, printah anyar kita bakal ana:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Kita bisa nggunakake kaya wong liya, mula ayo miwiti maneh buruh faust lan miwiti koleksi sekuritas lengkap:

> faust -A horton.agents start-collect-securities

Apa sing bakal kelakon mengko?

Ing bagean sabanjure, nggunakake agen sing isih ana minangka conto, kita bakal nimbang mekanisme sink kanggo nggoleki ekstrem ing harga penutupan dagang kanggo taun lan peluncuran agen cron.

Iku kabeh kanggo dina iki! Matur nuwun kanggo maca :)

Kode kanggo bagean iki

Tugas latar mburi ing Faust, Part II: Agen lan Tim

P.S. Ing bagean pungkasan aku ditakoni babagan faust lan confluent kafka (fitur apa konfluen duwe?). Kayane konfluen luwih fungsional kanthi pirang-pirang cara, nanging kasunyatane faust ora duwe dhukungan klien lengkap kanggo konfluen - iki nderek saka katrangan babagan watesan klien ing doc.

Source: www.habr.com

Add a comment