Cov hauj lwm hauv qab ntawm Faust, Ntu II: Cov Neeg Sawv Cev thiab Pab Pawg

Cov hauj lwm hauv qab ntawm Faust, Ntu II: Cov Neeg Sawv Cev thiab Pab Pawg

Cov txheej txheem

  1. Ntu I: Taw Qhia

  2. Ntu II: Cov Neeg Sawv Cev thiab Pab Pawg

Peb ua dab tsi ntawm no?

Yog li ntawd, qhov thib ob. Raws li tau sau ua ntej, hauv nws peb yuav ua cov hauv qab no:

  1. Cia peb sau tus neeg siv khoom me me rau alphavantage ntawm aiohttp nrog kev thov rau qhov kawg peb xav tau.

  2. Cia peb tsim ib tus neeg sawv cev uas yuav sau cov ntaub ntawv ntawm kev nyab xeeb thiab cov ntaub ntawv meta rau lawv.

Tab sis, qhov no yog qhov peb yuav ua rau qhov project nws tus kheej, thiab hais txog kev tshawb fawb faust, peb yuav kawm yuav ua li cas sau cov neeg sawv cev uas ua cov txheej xwm ntws los ntawm kafka, nrog rau yuav ua li cas sau cov lus txib (nias wrapper), hauv peb rooj plaub - rau phau ntawv thawb cov lus rau lub ncauj lus uas tus neeg sawv cev saib xyuas.

Kev cob qhia

AlphaVantage Client

Ua ntej, cia peb sau tus neeg siv aiohttp me me rau kev thov rau alphavantage.

alphavantage.py

spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Qhov tseeb, txhua yam yog tseeb los ntawm nws:

  1. AlphaVantage API yog qhov yooj yim thiab zoo nkauj tsim, yog li kuv txiav txim siab ua txhua qhov kev thov los ntawm txoj kev construct_query qhov twg nyob rau hauv lem muaj ib tug http hu.

  2. Kuv nqa tag nrho cov teb rau snake_case rau kev nplij siab.

  3. Zoo, lub logger.catch kho kom zoo nkauj kom zoo nkauj thiab cov ntaub ntawv traceback tso zis.

PS Tsis txhob hnov ​​​​qab ntxiv alphavantage token hauv zos rau config.yml, lossis xa tawm ib puag ncig hloov pauv HORTON_SERVICE_APIKEY. Peb tau txais ib qho token S, SѓS,.

CRUD chav kawm

Peb yuav muaj cov ntaub ntawv pov thawj los khaws cov ntaub ntawv meta txog kev nyab xeeb.

database/security.py

Hauv kuv lub tswv yim, tsis tas yuav piav dab tsi ntawm no, thiab cov chav kawm hauv paus nws tus kheej yog qhov yooj yim heev.

get_app()

Cia peb ntxiv ib txoj haujlwm rau kev tsim cov khoom siv hauv app.py ua

spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Txog tam sim no peb yuav muaj cov ntawv thov yooj yim tshaj plaws tsim, me ntsis tom qab peb yuav nthuav dav nws, txawm li cas los xij, txhawm rau kom tsis txhob cia koj tos, ntawm no cov ntaub ntawv mus rau App-class. Kuv kuj pom zoo kom ua tib zoo saib cov chav kawm, vim nws yog lub luag haujlwm rau feem ntau ntawm cov chaw.

Lub ntsiab qhov tseem ceeb

Tus neeg sawv cev rau kev sau thiab khaws cov npe ntawm kev nyab xeeb

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Yog li, ua ntej peb tau txais cov khoom siv faust - nws yooj yim heev. Tom ntej no, peb tshaj tawm meej meej ib lub ntsiab lus rau peb tus neeg sawv cev ... Ntawm no nws tsim nyog hais txog nws yog dab tsi, qhov ntsuas sab hauv yog dab tsi thiab qhov no tuaj yeem teeb tsa txawv li cas.

  1. Cov ntsiab lus hauv kafka, yog tias peb xav paub cov ntsiab lus tseeb, nws zoo dua los nyeem tawm. ntaub ntawv, los yog koj nyeem tau daim ntawv cog lus nyob rau hauv Habre nyob rau hauv Lavxias teb sab, qhov twg txhua yam yog reflected zoo heev :)

  2. Parameter sab hauv, piav qhia tau zoo heev hauv faust doc, tso cai rau peb los teeb tsa lub ntsiab lus ncaj qha rau hauv cov cai, ntawm chav kawm, qhov no txhais tau hais tias cov kev txwv tsis pub dhau los ntawm faust developers, piv txwv li: tuav, tuav txoj cai (los ntawm kev rho tawm, tab sis koj tuaj yeem teeb tsa. compact), tus naj npawb ntawm partitions ib lub ncauj lus (cov qhab niaua, piv txwv li, tsawg dua ntiaj teb qhov tseem ceeb apps faust).

  3. Feem ntau, tus neeg sawv cev tuaj yeem tsim cov ncauj lus tswj hwm nrog cov txiaj ntsig thoob ntiaj teb, txawm li cas los xij, kuv nyiam tshaj tawm txhua yam kom meej. Tsis tas li ntawd, qee qhov kev txwv (piv txwv li, tus naj npawb ntawm kev faib lossis kev tuav pov hwm txoj cai) ntawm cov ncauj lus hauv tus neeg sawv cev tshaj tawm tsis tuaj yeem teeb tsa.

    Nov yog qhov nws yuav zoo li yam tsis tau txhais lub ntsiab lus:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Zoo, tam sim no cia peb piav qhia seb peb tus neeg sawv cev yuav ua li cas :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Yog li, thaum pib ntawm tus neeg sawv cev, peb qhib qhov kev sib tham aiohttp rau kev thov los ntawm peb cov neeg siv khoom. Yog li, thaum pib tus neeg ua haujlwm, thaum peb tus neeg sawv cev tau pib, qhov kev sib tham yuav qhib tam sim ntawd - ib qho, rau tag nrho lub sijhawm tus neeg ua haujlwm khiav haujlwm (lossis ob peb, yog tias koj hloov qhov parameter. tib txhij los ntawm tus neeg sawv cev nrog lub default unit).

Tom ntej no, peb ua raws li kwj (peb muab cov lus tso rau hauv _, txij li peb, nyob rau hauv tus neeg sawv cev no, tsis quav ntsej txog cov ntsiab lus) ntawm cov lus los ntawm peb lub ncauj lus, yog tias lawv muaj nyob rau ntawm qhov tam sim no offset, txwv tsis pub peb lub voj voog yuav tos lawv tuaj txog. Zoo, hauv peb lub voj, peb teev cov ntawv txais cov lus, tau txais ib daim ntawv teev npe nquag (get_securities rov qab tsuas yog ua haujlwm los ntawm lub neej ntawd, saib cov neeg siv khoom) kev nyab xeeb thiab khaws cia rau hauv cov ntaub ntawv, xyuas seb puas muaj kev ruaj ntseg nrog tib ticker thiab pauv hauv cov ntaub ntawv, yog tias muaj, ces nws (cov ntawv) yuav tsuas yog hloov kho.

Cia peb pib peb cov creation!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS nta lub vev xaib Kuv yuav tsis xav txog faust hauv cov ntawv, yog li peb teeb tsa tus chij tsim nyog.

Hauv peb cov lus txib tso tawm, peb tau hais rau faust qhov twg los nrhiav cov khoom siv thiab yuav ua li cas nrog nws (pib tus neeg ua haujlwm) nrog cov ntaub ntawv teev cov zis theem. Peb tau txais cov zis hauv qab no:

spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Nws ciaj sia!!!

Cia peb saib ntawm qhov muab faib. Raws li peb tuaj yeem pom, ib lub ntsiab lus tau tsim nrog lub npe uas peb tau xaiv hauv cov lej, tus naj npawb ntawm kev faib tawm (8, coj los ntawm topic_partitions - daim ntawv thov khoom parameter), txij li peb tsis tau qhia tus nqi ntawm tus kheej rau peb cov ncauj lus (ntawm kev faib tawm). Tus neeg sawv cev tshaj tawm hauv tus neeg ua haujlwm tau muab tag nrho 8 qhov kev faib tawm, vim nws tsuas yog ib qho xwb, tab sis qhov no yuav tau tham txog ntau yam ntxiv hauv ntu txog kev sib koom ua ke.

Zoo, tam sim no peb tuaj yeem mus rau lwm lub qhov rais davhlau ya nyob twg thiab xa cov lus khoob rau peb lub ntsiab lus:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS siv @ peb qhia tias peb tab tom xa lus mus rau lub npe hu ua "collect_securities".

Hauv qhov no, cov lus tau mus rau faib 6 - koj tuaj yeem tshawb xyuas qhov no los ntawm kev mus rau kafdrop rau localhost:9000

Mus rau lub qhov rais davhlau ya nyob twg nrog peb tus neeg ua haujlwm, peb yuav pom cov lus zoo siab xa mus siv loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Peb kuj tuaj yeem saib mus rau hauv mongo (siv Robo3T lossis Studio3T) thiab pom tias cov kev nyab xeeb nyob hauv cov ntaub ntawv:

Kuv tsis yog billionaire, thiab yog li ntawd peb txaus siab nrog thawj qhov kev xaiv.

Cov hauj lwm hauv qab ntawm Faust, Ntu II: Cov Neeg Sawv Cev thiab Pab PawgCov hauj lwm hauv qab ntawm Faust, Ntu II: Cov Neeg Sawv Cev thiab Pab Pawg

Kev zoo siab thiab kev xyiv fab - thawj tus neeg sawv cev yog npaj txhij :)

Tus neeg sawv cev npaj txhij, nyob ntev tus neeg sawv cev tshiab!

Yog lawm, cov txiv neej, peb tsuas yog hais txog 1/3 ntawm txoj kev npaj los ntawm kab lus no, tab sis tsis txhob poob siab, vim tam sim no nws yuav yooj yim dua.

Yog li tam sim no peb xav tau tus neeg sawv cev uas sau cov ntaub ntawv meta thiab muab tso rau hauv ib daim ntawv sau:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Txij li tus neeg sawv cev no yuav ua cov ntaub ntawv hais txog kev ruaj ntseg tshwj xeeb, peb yuav tsum tau qhia tus ticker (cim) ntawm qhov kev ruaj ntseg no hauv cov lus. Rau lub hom phiaj no hauv faust muaj Cov ntaub ntawv - Cov chav kawm uas tshaj tawm cov lus qhia hauv lub ncauj lus tus neeg sawv cev.

Hauv qhov no, cia peb mus records.py thiab piav qhia seb cov lus rau lub ncauj lus no yuav tsum zoo li cas:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Raws li koj tuaj yeem tau twv, faust siv cov python hom lus piav qhia los piav qhia cov lus qhia, uas yog vim li cas qhov tsawg kawg nkaus txhawb los ntawm lub tsev qiv ntawv yog 3.6.

Cia peb rov qab mus rau tus neeg sawv cev, teeb tsa hom thiab ntxiv nws:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Raws li koj tuaj yeem pom, peb dhau qhov ntsuas tshiab nrog lub tswv yim rau lub ntsiab lus pib txoj kev - value_type . Tsis tas li ntawd, txhua yam ua raws tib lub tswv yim, yog li kuv tsis pom ib qho taw qhia nyob rau hauv lwm yam.

Zoo, qhov kawg kov yog ntxiv hu rau tus neeg sawv cev sau cov ntaub ntawv meta rau collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Peb siv cov txheej txheem tshaj tawm yav dhau los rau cov lus. Hauv qhov no, kuv tau siv txoj kev .cast vim peb tsis tas yuav tos qhov tshwm sim los ntawm tus neeg sawv cev, tab sis nws tsim nyog hais tias txoj kev xa lus rau lub ncauj lus:

  1. cam khwb cia - tsis thaiv vim nws tsis xav tau qhov tshwm sim. Koj tsis tuaj yeem xa cov txiaj ntsig mus rau lwm lub ncauj lus ua lus.

  2. xa - tsis thaiv vim nws tsis xav tau qhov tshwm sim. Koj tuaj yeem qhia tus neeg sawv cev hauv lub ncauj lus uas qhov tshwm sim yuav mus.

  3. nug - tos rau qhov tshwm sim. Koj tuaj yeem qhia tus neeg sawv cev hauv lub ncauj lus uas qhov tshwm sim yuav mus.

Yog li, qhov ntawd yog txhua yam nrog cov neeg sawv cev rau hnub no!

Pab neeg npau suav

Qhov kawg uas kuv tau cog lus tias yuav sau rau hauv ntu no yog cov lus txib. Raws li tau hais ua ntej, cov lus txib hauv faust yog wrapper ncig nias. Qhov tseeb, faust tsuas txuas peb cov kev cai hais kom ua rau nws lub interface thaum qhia qhov tseem ceeb -A

Tom qab tshaj tawm cov neeg sawv cev hauv agents.py ntxiv ib tug muaj nuj nqi nrog ib tug decorator app.commandhu rau txoj kev cam khwb cia у sau_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Yog li, yog tias peb hu cov npe ntawm cov lus txib, peb cov lus txib tshiab yuav nyob hauv nws:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Peb tuaj yeem siv nws zoo li lwm tus, yog li cia peb rov pib tus neeg ua haujlwm faust thiab pib sau tag nrho cov kev nyab xeeb:

> faust -A horton.agents start-collect-securities

Dab tsi yuav tshwm sim tom ntej?

Nyob rau hauv ib feem tom ntej no, siv cov neeg ua hauj lwm seem ua piv txwv, peb yuav txiav txim siab lub dab dej mechanism rau kev tshawb nrhiav rau extrems nyob rau hauv lub kaw nqi ntawm trading rau lub xyoo thiab lub cron tso tawm cov neeg sawv cev.

Qhov ntawd yog txhua yam rau hnub no! Tsaug nyeem ntawv :)

Code rau qhov no

Cov hauj lwm hauv qab ntawm Faust, Ntu II: Cov Neeg Sawv Cev thiab Pab Pawg

PS Hauv qab ntu kawg kuv tau nug txog faust thiab confluent kafka (dab tsi yog confluent muaj?). Nws zoo nkaus li tias confluent ua haujlwm ntau dua hauv ntau txoj hauv kev, tab sis qhov tseeb yog tias faust tsis muaj tag nrho cov neeg siv khoom txhawb nqa rau confluent - qhov no ua raws li los ntawm piav qhia txog kev txwv cov neeg siv khoom hauv doc.

Tau qhov twg los: www.hab.com

Ntxiv ib saib