Tugas latar dina Faust, Bagian II: Agén sareng Tim

Tugas latar dina Faust, Bagian II: Agén sareng Tim

daptar eusi

  1. Bagian I: Pendahuluan

  2. Bagian II: Agén sareng Tim

Keur naon urang di dieu?

Janten, janten, bagian kadua. Sakumaha anu diserat sateuacana, urang bakal ngalakukeun ieu:

  1. Hayu urang nulis hiji klien leutik keur alphavantage on aiohttp kalawan requests pikeun titiktungtung kami butuh.

  2. Hayu urang ngadamel agén anu bakal ngumpulkeun data ngeunaan sekuritas sareng inpormasi meta ngeunaan aranjeunna.

Tapi, ieu anu bakal urang laksanakeun pikeun proyék éta sorangan, sareng dina hal panalungtikan anu gancang, urang bakal diajar kumaha nyerat agén anu ngolah acara aliran tina kafka, ogé kumaha cara nyerat paréntah (klik wrapper), dina kasus urang - pikeun pesen push manual ka topik nu ngawas agén.

palatihan

Klién AlphaVantage

Kahiji, hayu urang nulis aiohttp klien leutik pikeun requests mun alphavantage.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Sabenerna, sagalana jelas ti eta:

  1. AlphaVantage API cukup basajan tur beautifully dirancang, jadi kuring mutuskeun pikeun nyieun sagala requests ngaliwatan métode construct_query dimana dina gilirannana aya panggero http.

  2. Kuring mawa sagala widang ka snake_case pikeun genah.

  3. Muhun, nu logger.catch hiasan pikeun kaluaran traceback geulis tur informatif.

PS Ulah poho pikeun nambahkeun token alphavantage lokal pikeun config.yml, atawa ékspor variabel lingkungan HORTON_SERVICE_APIKEY. Kami nampi token di dieu.

kelas CRUD

Kami bakal gaduh koleksi sekuritas pikeun nyimpen inpormasi meta ngeunaan sekuritas.

database/security.py

Dina pamanggih kuring, teu perlu ngajelaskeun nanaon di dieu, jeung kelas dasar sorangan cukup basajan.

get_app()

Hayu urang tambahkeun fungsi pikeun nyieun hiji objek aplikasi dina app.py

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Pikeun ayeuna urang bakal gaduh kreasi aplikasi pangbasajanna, saeutik engké urang bakal dilegakeun eta, kumaha oge, dina urutan teu tetep anjeun ngantosan, didieu rujukan ka App-kelas. Kuring ogé mamatahan anjeun nyandak katingal dina kelas setelan, saprak éta jawab lolobana setelan.

utama

Agen pikeun ngumpulkeun jeung ngajaga daptar jaminan

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Janten, mimitina urang kéngingkeun obyék aplikasi faust - éta saderhana. Salajengna, urang eksplisit dibewarakeun topik pikeun agén kami ... Di dieu eta sia mentioning naon éta, naon parameter internal tur kumaha ieu bisa diatur béda.

  1. Topik dina kafka, upami urang hoyong terang definisi pasti, langkung saé maca peureum. dokumén, atawa anjeun bisa maca kompendium dina Habré dina basa Rusia, dimana sadayana ogé ditingalikeun sacara akurat :)

  2. Parameter internal, Dijelaskeun lumayan lah dina doc faust, ngamungkinkeun urang pikeun ngonpigurasikeun topik langsung dina kode, tangtosna, ieu hartosna parameter anu disayogikeun ku pamekar faust, contona: ingetan, kawijakan ingetan (sacara standar ngahapus, tapi anjeun tiasa nyetél padet), jumlah partisi per topik (partitionsngalakukeun, contona, kirang ti significance global aplikasi faust).

  3. Sacara umum, agén bisa nyieun topik junun kalawan nilai global, kumaha oge, kuring resep dibewarakeun sagalana eksplisit. Salaku tambahan, sababaraha parameter (contona, jumlah partisi atanapi kawijakan ingetan) topik dina iklan agén teu tiasa dikonpigurasi.

    Ieu kumaha sigana tanpa nangtukeun topik sacara manual:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Nya, ayeuna hayu urang ngajelaskeun naon anu bakal dilakukeun ku agén kami :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Ku kituna, dina awal agén, urang muka sési aiohttp pikeun requests ngaliwatan klien kami. Ku kituna, nalika ngamimitian pagawe, nalika agén urang diluncurkeun, sési bakal langsung dibuka - hiji, pikeun sakabéh waktos worker ngajalankeun (atawa sababaraha, lamun ngarobah parameter. kasaluyuan ti agén sareng unit standar).

Salajengna, urang turutan aliran (urang nempatkeun pesen dina _, Kusabab urang, dina agén ieu, teu paduli ngeunaan eusi) pesen ti topik urang, upami aranjeunna aya di offset ayeuna, disebutkeun siklus urang bakal ngadagoan datangna maranéhanana. Nya, di jero loop kami, urang asupkeun resi pesen, kéngingkeun daptar sekuritas (get_securities ngan ukur aktip sacara standar, tingali kode klien) sareng simpen kana pangkalan data, mariksa upami aya kaamanan sareng ticker anu sami sareng bursa dina database , lamun aya, teras eta (kertas) ngan bakal diropéa.

Hayu urang ngajalankeun kreasi urang!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Keunggulan PS komponén wéb Kuring moal nganggap faust dina artikel, jadi urang nyetel bandéra luyu.

Dina paréntah peluncuran kami, kami ngawartoskeun faust dimana néangan objék aplikasi tur naon anu kudu dipigawé kalayan eta (ngajalankeun worker a) kalawan tingkat kaluaran info log. Urang meunang kaluaran handap:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ieu hirup!!!

Hayu urang tingali set partisi. Sakumaha anu urang tingali, topik diciptakeun kalayan nami anu ditunjuk dina kode, jumlah partisi standar (8, dicandak tina topic_partitions - parameter obyék aplikasi), sabab kami henteu netepkeun nilai individu pikeun topik kami (via partisi). Agén dibuka dina worker ditugaskeun sadayana 8 partitions, saprak éta ngan hiji, tapi ieu bakal dibahas dina leuwih jéntré dina bagian ngeunaan clustering.

Nya, ayeuna urang tiasa angkat ka jandela terminal anu sanés sareng ngirim pesen kosong ka topik urang:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS ngagunakeun @ kami nunjukkeun yén kami ngirim pesen ka topik anu namina "collect_securities".

Dina hal ieu, pesen indit ka partisi 6 - anjeun tiasa pariksa ieu ku bade kafdrop on localhost:9000

Bade ka jandela terminal sareng padamel urang, urang bakal ningali pesen bagja anu dikirim nganggo loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Urang ogé tiasa ningali kana mongo (ngagunakeun Robo3T atanapi Studio3T) sareng ningali yén jaminan aya dina pangkalan data:

Abdi sanés jutawan, sareng ku kituna kami sugema ku pilihan ningali anu munggaran.

Tugas latar dina Faust, Bagian II: Agén sareng TimTugas latar dina Faust, Bagian II: Agén sareng Tim

Kabagjaan sareng kabagjaan - agén munggaran parantos siap :)

Agén siap, hirup agén anyar!

Sumuhun, gentlemen, kami geus ngan nutupan 1/3 tina jalur disiapkeun ku artikel ieu, tapi ulah discouraged, sabab ayeuna bakal leuwih gampang.

Janten ayeuna urang peryogi agén anu ngumpulkeun inpormasi meta sareng nempatkeun kana dokumen koleksi:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Kusabab agén ieu bakal ngolah inpormasi ngeunaan kaamanan khusus, urang kedah nunjukkeun ticker (simbol) kaamanan ieu dina suratna. Pikeun tujuan ieu di faust aya Records - kelas anu nyatakeun skéma pesen dina topik agén.

Dina hal ieu, hayu urang buka records.py sareng ngajelaskeun kumaha pesen pikeun topik ieu kedah sapertos:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Sakumaha anjeun panginten, faust nganggo anotasi jinis python pikeun ngajelaskeun skéma pesen, naha éta versi minimum anu dirojong ku perpustakaan nyaéta. 3.6.

Hayu urang balik deui ka agén, setel jinis sareng tambahkeun:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Sakumaha anjeun tiasa tingali, urang lulus parameter anyar kalawan skéma metoda initialization topik - value_type. Salajengna, sadayana nuturkeun skéma anu sami, janten kuring henteu ningali titik naon waé anu sanés.

Nya, sentuhan terakhir nyaéta nambihan telepon ka agén pangumpulan inpormasi meta pikeun collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Kami nganggo skéma anu diumumkeun sateuacana pikeun pesen. Dina hal ieu, kuring nganggo metodeu .cast sabab urang henteu kedah ngantosan hasil tina agén, tapi kedah disebatkeun yén cara ngirim pesen ka topik:

  1. matak - teu meungpeuk sabab teu nyangka hasilna. Anjeun teu bisa ngirim hasil ka topik sejen salaku pesen.

  2. ngirim - teu meungpeuk sabab teu nyangka hasilna. Anjeun tiasa nangtukeun agén dina topik anu hasilna bakal angkat.

  3. nanya - ngantosan hasilna. Anjeun tiasa nangtukeun agén dina topik anu hasilna bakal angkat.

Janten, éta sadayana sareng agén dinten ayeuna!

Tim impian

Hal anu terakhir anu kuring jangji pikeun nyerat dina bagian ieu nyaéta paréntah. Sakumaha anu disebatkeun sateuacana, paréntah di faust mangrupikeun bungkus dina klik. Kanyataanna, faust ngan saukur ngagantelkeun paréntah khusus urang kana antarmukana nalika netepkeun konci -A

Saatos agén ngumumkeun dina agents.py nambahkeun fungsi kalawan decorator a app.commandnelepon métode nyitak у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Janten, upami urang nyauran daptar paréntah, paréntah énggal urang bakal aya:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Urang tiasa nganggo éta sapertos saha waé, janten hayu urang balikan deui pagawé anu gancang sareng ngamimitian koleksi jaminan anu lengkep:

> faust -A horton.agents start-collect-securities

Naon anu bakal kajadian salajengna?

Dina bagian salajengna, ngagunakeun agén sésana sabagé conto, urang bakal mertimbangkeun mékanisme tilelep pikeun néangan extremes dina harga nutup dagang pikeun sataun jeung peluncuran cron agén.

Éta sadayana kanggo dinten ieu! Hatur nuhun pikeun maca :)

Kode pikeun bagian ieu

Tugas latar dina Faust, Bagian II: Agén sareng Tim

PS Dina bagian panungtungan kuring ditanya ngeunaan faust na confluent kafka (fitur naon anu confluent gaduh?). Sigana yén confluent langkung fungsional dina sababaraha cara, tapi kanyataanna nyaéta faust henteu gaduh dukungan klien pinuh pikeun confluent - ieu di handap tina déskripsi ngeunaan larangan klien dina doc.

sumber: www.habr.com

Tambahkeun komentar