Galuega i tua i luga o Faust, Vaega II: Sui Usufono ma Au

Galuega i tua i luga o Faust, Vaega II: Sui Usufono ma Au

Lisi o Mataupu

  1. Vaega I: Folasaga

  2. Vaega II: Suisuaga ma Au

O le a le mea o tatou faia iinei?

O lea la, o le vaega lona lua. E pei ona tusia muamua, i totonu o le a tatou faia mea nei:

  1. Se'i o tatou tusi se tama'i tagata fa'atau mo le alphavantage i luga ole aiohttp ma talosaga mo fa'ai'uga tatou te mana'omia.

  2. Sei o tatou faia se sooupu o le a aoina faʻamatalaga i luga o puipuiga ma faʻamatalaga meta i luga oi latou.

Ae, o le mea lenei o le a tatou faia mo le poloketi lava ia, ma i tulaga o suʻesuʻega faʻavave, o le a tatou aʻoaʻoina pe faʻapefea ona tusia ni sui e faʻagasolo faʻasologa o mea mai le kafka, faʻapea foʻi ma le auala e tusi ai poloaiga (kiliki afifi), i la tatou mataupu - mo savali tulei tusilima i le autu o loʻo mataʻituina e le sooupu.

Sauniuniga

AlphaVantage Client

Muamua, se'i o tatou tusi se tama'i aiohttp client mo talosaga i le alphavantage.

alphavantage.py

faaleaga

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

O le mea moni, e manino mea uma mai ai:

  1. Ole AlphaVantage API e faigofie ma matagofie le mamanuina, o lea na ou filifili ai e faia uma talosaga e ala i le metotia construct_query o fea e i ai le telefoni http.

  2. Ou te aumaia fanua uma i snake_case mo le faʻafaigofieina.

  3. Ia, o le logger.catch teuteuga mo le matagofie ma le fa'amatalaga fa'asologa o galuega.

PS Aua nei galo e faaopoopo le faailoga alphavantage i le lotoifale i le config.yml, pe auina atu i fafo le fesuiaiga o le siosiomaga HORTON_SERVICE_APIKEY. Matou te maua se faʻailoga iinei.

vasega CRUD

O le a iai a matou fa'aputuga tupe e teu ai fa'amatalaga meta e uiga i fa'amalumaluga.

database/security.py

I loʻu manatu, e leai se manaʻoga e faʻamatalaina se mea iinei, ma o le vasega faavae lava ia e faigofie tele.

maua_app()

Sei o tatou fa'aopoopoina se galuega mo le fatuina o se mea fa'aoga i totonu app.py

faaleaga

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Mo le taimi nei o le a matou maua le faʻaoga faigofie, i se taimi mulimuli ane o le a matou faʻalauteleina, peitaʻi, ina ia le faʻatali oe, iinei faasinomaga i App-vasega. Ou te fautua atu foi ia te oe e te tilotilo i le vasega tulaga, talu ai e nafa ma le tele o tulaga.

O le tino tino

Sui sooupu mo le aoina ma le tausia o se lisi o faamalumaluga

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

O lea la, muamua tatou te maua le faust application object - e fai si faigofie. Ma le isi, matou te faʻaalia manino se autu mo le matou sui ... O iinei e taua le taʻua o le mea, o le a le mea i totonu ma le auala e mafai ai ona faʻatulagaina ese.

  1. Autu i le kafka, afai tatou te fia iloa le faʻamatalaga saʻo, e sili atu le faitau ese. pepa, pe mafai ona e faitau fa'asalalauga i luga o Habré i le gagana Rusia, lea o loʻo atagia mai ai mea uma lava :)

  2. Parameter totonu, faʻamatala lelei i le faust doc, faʻatagaina i matou e faʻapipiʻi saʻo le autu i totonu o le code, ioe, o lona uiga o tapulaʻa na tuʻuina atu e le au faust developers, mo se faʻataʻitaʻiga: faʻatumauina, faʻatumauina faiga faʻavae (e ala i le faʻamalo tape, ae e mafai ona e setiina. māopoopo), numera o vaeluaga ile autu (fevaevaeaigae fai, mo se faataitaiga, itiiti ifo nai lo taua o le lalolagi talosaga faust).

  3. I se tulaga lautele, e mafai e le sooupu ona fatuina se autu pulea ma tulaga taua o le lalolagi, ae ui i lea, ou te fiafia e taʻu saʻo mea uma. E le gata i lea, o nisi faʻamaufaʻailoga (mo se faʻataʻitaʻiga, numera o vaeluaga poʻo le faʻamauina o faiga faʻavae) o le autu i le faʻasalalauga faʻasalalauga e le mafai ona faʻatulagaina.

    O foliga ia e aunoa ma le fa'amalamalamaina ma le lima o le autu:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Ia, se'i o tatou fa'amatala le mea o le a fai e le tatou sui :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

O lea la, i le amataga o le sooupu, matou te tatalaina se sauniga aiohttp mo talosaga e ala i le matou tagata faʻatau. O le mea lea, pe a amataina se tagata faigaluega, pe a faʻalauiloa le matou sui, o le a vave ona tatalaina se sauniga - tasi, mo le taimi atoa o loʻo tamoe ai le tagata faigaluega (pe tele, pe a e suia le parakalafa maliega mai se sooupu e iai se iunite faaletonu).

Ma le isi, matou te mulimuli i le vaitafe (matou te tuʻuina le feʻau i totonu _, talu ai o matou, i lenei sui, e le popole i mea o loʻo i ai) o feʻau mai la matou autu, pe afai latou te i ai i le taimi nei offset, a leai o le a faʻatali la matou taamilosaga mo lo latou taunuu mai. Ia, i totonu o la matou matasele, matou te faʻamauina le lisiti o le feʻau, maua se lisi o le gaioiga (get_securities toe foʻi naʻo le gaioiga, vaʻai le code client) faʻamaufaʻailoga ma teu i luga o faʻamaumauga, siaki pe oi ai se puipuiga ma le siaki tutusa ma fefaʻasoaaʻi i totonu o faʻamaumauga, pe a iai, ona toe faʻafouina lea (le pepa).

Se'i amata la tatou foafoaga!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features vaega i luga ole laiga O le a ou le mafaufau i le faust i tala, o lea matou te setiina ai le fuʻa talafeagai.

I la matou faʻatonuga faʻalauiloa, na matou taʻu atu i le faust le mea e suʻe ai le mea faʻaoga ma le mea e fai i ai (faʻalauiloa se tagata faigaluega) faʻatasi ai ma le faʻamatalaga o faʻamatalaga faʻapipiʻi tulaga. Matou te maua le faʻatinoga o loʻo i lalo:

faaleaga

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ua ola!!!

Se'i tatou va'ai i le seti vaeluaga. E pei ona mafai ona tatou vaʻavaʻai, na faia se autu ma le igoa na matou filifilia i le code, le numera le aoga o vaega (8, ave mai autu_vaega - fa'ata'ita'iga mea fa'aoga), talu ai matou te le'i fa'ailoaina se tau fa'atatau mo la matou autu (e ala i vaega). O le sui faʻalauiloaina i le tagata faigaluega e tuʻuina atu uma vaega e 8, talu ai e naʻo le tasi, ae o le a talanoaina atili auiliiliga i le vaega e uiga i le faʻapipiʻiina.

Ia, o lea e mafai ona tatou o atu i se isi faʻamalama faʻamalama ma lafo se feʻau gaogao i la tatou autu:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS faʻaaogaina @ matou te faʻaalia o loʻo matou lafoina se feʻau i se autu e taʻua "collect_securities".

I lenei tulaga, o le savali na alu i le vaeluaga 6 - e mafai ona e siakiina lenei mea e ala i le alu i le kafdrop on localhost:9000

O le alu i le faʻamalama faʻamalama ma le matou tagata faigaluega, o le a matou vaʻai i se feʻau fiafia na lafoina e faʻaaoga ai loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

E mafai foi ona tatou vaʻavaʻai i le mongo (faʻaaogaina Robo3T poʻo Studio3T) ma vaʻai o loʻo i totonu o faʻamaumauga faʻamaumauga:

E le o aʻu o se piliona, ma o lea ua matou fiafia i le filifiliga muamua e matamata ai.

Galuega i tua i luga o Faust, Vaega II: Sui Usufono ma AuGaluega i tua i luga o Faust, Vaega II: Sui Usufono ma Au

Fiafia ma le fiafia - ua sauni le sui muamua :)

Ua sauni le sui, ia soifua le sui fou!

Ioe, alii, ua na o le 1/3 o le ala na saunia e lenei tusitusiga, ae aua le lotovaivai, aua o lea ua faigofie.

O lea la, matou te manaʻomia se sooupu e aoina faʻamatalaga meta ma tuʻu i totonu o se pepa aoina:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Talu ai o le a faʻagasolo e lenei sui faʻamatalaga e uiga i se puipuiga patino, e tatau ona matou faʻaalia le faʻailoga (faailoga) o lenei puipuiga i le feʻau. Mo lenei faamoemoe i faus e iai faamaumauga — vasega o lo'o fa'ailoa atu le polokalame fe'au i le autu a sui.

I lenei tulaga, tatou o i records.py ma faamatala le uiga o le savali mo lenei autu:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

E pei ona e mateina, faust faʻaaoga le python type annotation e faʻamatala ai le feʻau schema, o le mea lea e mafua ai le laʻititi laʻititi e lagolagoina e le faletusi. 3.6.

Sei o tatou toe foi i le sooupu, seti ituaiga ma faaopoopo i ai:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

E pei ona mafai ona e vaʻaia, matou te pasia se parakalafa fou ma se fuafuaga i le autu o le amataga metotia - value_type. E le gata i lea, o mea uma e mulimuli i le fuafuaga lava e tasi, o lea ou te le iloa ai se mea e nofo ai i se isi lava mea.

Ia, o le pa'i mulimuli o le fa'aopoopoina lea o se vala'au i le meta fa'amatalaga fa'aputuga sui e collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Matou te faʻaaogaina le polokalame na faʻasalalau muamua mo le feʻau. I lenei tulaga, na ou faʻaaogaina le .cast method talu ai tatou te le manaʻomia le faʻatali mo le taunuʻuga mai le sooupu, ae e taua le taʻua o lena mea. auala auina atu se savali i le autu:

  1. lafo - e le poloka aua e le o faamoemoeina se taunuuga. E le mafai ona e auina atu le taunuuga i se isi autu o se savali.

  2. auina atu - e le poloka aua e le o faamoemoeina se taunuuga. E mafai ona e faʻamaonia se sui i le autu e alu i ai le iʻuga.

  3. fesili - fa'atali mo se fa'ai'uga. E mafai ona e faʻamaonia se sui i le autu e alu i ai le iʻuga.

O lea la, o mea uma na i sui mo aso nei!

Le au miti

O le mea mulimuli na ou folafola atu e tusi i lenei vaega o poloaiga. E pei ona taʻua muamua, o faʻatonuga i faus o se afifi faataamilo i kiliki. O le mea moni, faust na o le faʻapipiʻiina o la matou faʻatonuga masani i lona atinaʻe pe a faʻamaonia le -A ki

Ina ua mae'a fa'asalalau sui i totonu agents.py fa'aopoopo se galuega fa'atasi ma se tagata teuteu app.commandvalaau le metotia lafo у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

O le mea lea, afai tatou te taʻua le lisi o poloaiga, o la tatou poloaiga fou o le ai ai i totonu:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

E mafai ona tatou faʻaaogaina e pei o se isi, o lea tatou toe amata le faust worker ma amata se faʻaputuga atoa o faʻamalumaluga:

> faust -A horton.agents start-collect-securities

O le a le mea o le a sosoo ai?

I le isi vaega, e faʻaaoga ai sui o loʻo totoe e fai ma faʻataʻitaʻiga, o le a tatou mafaufau i le masini suʻesuʻe mo le suʻesuʻeina o mea ogaoga i tau tapunia o fefaʻatauaiga mo le tausaga ma le faʻalauiloaina o le cron o sui.

Na pau lava lena mo le aso! Faafetai mo le faitau :)

Code mo lenei vaega

Galuega i tua i luga o Faust, Vaega II: Sui Usufono ma Au

PS I lalo o le vaega mulimuli na fesiligia aʻu e uiga i faust ma confluent kafka (o a mea e iai le confluent?). E foliga mai o le confluent e sili atu ona aoga i le tele o auala, ae o le mea moni e le o maua e faust le lagolago atoatoa o tagata o tausia mo le confluent - e mulimuli mai fa'amatalaga o tapula'a a tagata o tausia ile doc.

puna: www.habr.com

Faaopoopo i ai se faamatalaga