Nā hana hope ma Faust, Māhele II: Nā Agena a me nā Pūʻulu

Nā hana hope ma Faust, Māhele II: Nā Agena a me nā Pūʻulu

Ka papa o nā mea

  1. Mahele I: Hoʻolauna

  2. Mahele II: Na Agena a me na Hui

He aha kā mākou e hana nei ma ʻaneʻi?

No laila, no laila, ka ʻāpana ʻelua. E like me ka mea i kākau mua ʻia, e hana mākou i kēia:

  1. E kākau i kahi mea kūʻai liʻiliʻi no ka alphavantage ma aiohttp me nā noi no nā helu hope a mākou e pono ai.

  2. E hana kākou i luna nāna e hōʻiliʻili i ka ʻikepili no nā palekana a me ka ʻike meta ma luna o lākou.

Akā, ʻo kēia ka mea a mākou e hana ai no ka papahana ponoʻī, a ma ke ʻano o ka noiʻi faust, e aʻo mākou pehea e kākau ai i nā mea hana e hoʻoponopono i nā hanana kahawai mai kafka, a me pehea e kākau ai i nā kauoha (click wrapper), i kā mākou hihia - no nā memo paʻi lima i ke kumuhana a ka luna e nānā nei.

ʻO ka hoʻomākaukauʻana

Mea kūʻai aku AlphaVantage

ʻO ka mua, e kākau i kahi mea kūʻai aiohttp liʻiliʻi no nā noi e alphavantage.

alphavantage.py

mea hao wale

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

ʻOiaʻiʻo, maopopo nā mea a pau mai ia mea:

  1. ʻO ka AlphaVantage API he mea maʻalahi a nani hoʻi, no laila ua hoʻoholo wau e hana i nā noi āpau ma ke ʻano construct_query ma kahi o ka hea he http.

  2. Lawe au i na kihapai a pau i snake_case no ka maʻalahi.

  3. ʻAe, ʻo ka mea hoʻonaninani logger.catch no ka hoʻopuka traceback nani a ʻike.

PS Mai poina e hoʻohui i ka hōʻailona alphavantage ma ka ʻāina iā config.yml, a i ʻole e hoʻokuʻu aku i ka ʻano hoʻololi kaiapuni HORTON_SERVICE_APIKEY. Loaʻa iā mākou kahi hōʻailona maanei.

Papa CRUD

E loaʻa iā mākou kahi hōʻiliʻili palekana e mālama i ka ʻike meta e pili ana i nā palekana.

waihona/security.py

I koʻu manaʻo, ʻaʻohe pono e wehewehe i kekahi mea ma aneʻi, a maʻalahi ka papa kumu.

get_app()

E hoʻohui i kahi hana no ka hana ʻana i kahi mea noi i loko polokalamu.py

mea hao wale

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

I kēia manawa e loaʻa iā mākou ka hana noi maʻalahi, ma hope iki e hoʻonui mākou iā ia, akā naʻe, i ʻole e kali iā ʻoe, eia nā kuhikuhi i App-papa. Ke aʻo aku nei au iā ʻoe e nānā i ka papa hoʻonohonoho, no ka mea nona ke kuleana no ka hapa nui o nā hoʻonohonoho.

ʻO kaʻaoʻao nui

Agena no ka hōʻiliʻili a mālama ʻana i kahi papa inoa o nā securities

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

No laila, loaʻa mua mākou i ka mea noi faust - maʻalahi loa. Ma hope aʻe, hōʻike maopopo mākou i kahi kumuhana no kā mākou luna ... Eia ke kūpono e haʻi i ke ʻano o ia mea, he aha ke ʻano o loko a pehea e hiki ai ke hoʻonohonoho ʻokoʻa.

  1. Nā kumuhana i kafka, inā makemake mākou e ʻike i ka wehewehe kikoʻī, ʻoi aku ka maikaʻi o ka heluhelu ʻana hemo. palapala, a i ʻole hiki iā ʻoe ke heluhelu compendium ma ka hub ma Lūkini, kahi i hōʻike pololei ʻia nā mea āpau :)

  2. Ka palena o loko, i wehewehe maikaʻi ʻia i ka faust doc, hiki iā mākou ke hoʻonohonoho pololei i ke kumuhana i loko o ke code, ʻoiaʻiʻo, ʻo ia ke ʻano o nā ʻāpana i hāʻawi ʻia e nā mea hoʻomohala faust, no ka laʻana: hoʻopaʻa ʻana, kulekele paʻa (ma ka hoʻopau paʻamau, akā hiki iā ʻoe ke hoʻonohonoho. kana olelo hoopomaikai), ka helu o nā ʻāpana no ke kumuhana (nā māhelee hana, no ka laʻana, emi mai koʻikoʻi honua noi faust).

  3. Ma keʻano laulā, hiki i ka ʻelele ke hana i kahi kumuhana i mālama ʻia me nā waiwai honua, akā naʻe, makemake wau e haʻi pololei i nā mea āpau. Eia kekahi, ʻaʻole hiki ke hoʻonohonoho ʻia kekahi mau ʻāpana (no ka laʻana, ka helu o nā ʻāpana a i ʻole ke kulekele paʻa) o ke kumuhana i ka hoʻolaha hoʻolaha.

    Eia ke ʻano o kona ʻano me ka wehewehe ʻole ʻana i ke kumuhana:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ʻĀ, ʻānō e wehewehe i ka mea a kā mākou luna e hana ai :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

No laila, i ka hoʻomaka ʻana o ka ʻelele, wehe mākou i kahi hālāwai aiohttp no nā noi ma o kā mākou mea kūʻai aku. No laila, i ka wā e hoʻomaka ai i kahi limahana, i ka wā e hoʻomaka ai kā mākou luna, e wehe koke ʻia kahi hālāwai - hoʻokahi, no ka manawa holoʻokoʻa e holo ana ka mea hana (a i ʻole, inā ʻoe e hoʻololi i ka ʻāpana. kaʻae kūlike mai kahi ʻelele me kahi ʻāpana paʻamau).

A laila, hahai mākou i ke kahawai (kau mākou i ka memo i loko _, no ka mea, ʻaʻole mākou e mālama i ka ʻike) o nā memo mai kā mākou kumuhana, inā aia lākou i ka offset o kēia manawa, inā ʻaʻole e kali kā mākou pōʻai i ko lākou hōʻea ʻana. ʻAe, i loko o kā mākou loop, hoʻopaʻa mākou i ka loaʻa ʻana o ka memo, e kiʻi i kahi papa inoa o nā mea hana (get_securities hoʻi wale i ka hana ma ka paʻamau, ʻike i ka code client) securities a mālama iā ia i ka waihona, e nānā inā aia kahi palekana me ka ticker like a hoʻololi i ka waihona , inā loaʻa, a laila e hoʻonui hou ʻia (ka pepa).

E hoʻomaka kākou i kā mākou hana ʻana!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

Nā hiʻohiʻona PS ʻāpana pūnaewele ʻAʻole wau e noʻonoʻo i ka faus i nā ʻatikala, no laila hoʻonoho mākou i ka hae kūpono.

I kā mākou kauoha hoʻolaha, ua haʻi mākou iā faust i kahi e ʻimi ai i ka mea noi a me ka mea e hana ai me ia (hoʻomaka i kahi mea hana) me ka pae kiʻekiʻe o ka log log. Loaʻa iā mākou ka hopena aʻe:

mea hao wale

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Ua ola!!!

E nānā kākou i ka ʻāpana ʻāpana. E like me kā mākou e ʻike ai, ua hana ʻia kahi kumuhana me ka inoa a mākou i koho ai i ke code, ka helu paʻamau o nā ʻāpana (8, lawe ʻia mai kumuhana_partitions - ka hoʻohālikelike mea noiʻi), no ka mea ʻaʻole mākou i kuhikuhi i kahi waiwai pilikino no kā mākou kumuhana (ma o nā ʻāpana). Hāʻawi ʻia ka mea hana i hoʻokuʻu ʻia i ka mea hana i nā ʻāpana 8 a pau, ʻoiai ʻo ia wale nō, akā e kūkākūkā ʻia kēia ma ka ʻāpana e pili ana i ka clustering.

ʻAe, hiki iā mākou ke hele i kahi puka makani ʻē aʻe a hoʻouna i kahi leka i kā mākou kumuhana:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS hoʻohana @ hōʻike mākou e hoʻouna ana mākou i kahi leka i kahi kumuhana i kapa ʻia "collect_securities".

I kēia hihia, hele ka memo i ka ʻāpana 6 - hiki iā ʻoe ke nānā i kēia ma ka hele ʻana i kafdrop ma localhost:9000

Ke hele nei i ka puka makani me kā mākou mea hana, e ʻike mākou i kahi leka hauʻoli i hoʻouna ʻia me ka loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Hiki iā mākou ke nānā i loko o ka mongo (e hoʻohana ana iā Robo3T a i ʻole Studio3T) a ʻike i nā palekana i loko o ka waihona:

ʻAʻole wau he billionaire, a no laila ua ʻoluʻolu mākou i ke koho ʻike mua.

Nā hana hope ma Faust, Māhele II: Nā Agena a me nā PūʻuluNā hana hope ma Faust, Māhele II: Nā Agena a me nā Pūʻulu

Ka hauʻoli a me ka hauʻoli - ua mākaukau ka luna mua :)

Mākaukau ka ʻelele, e ola ka ʻelele hou!

ʻAe, e nā keonimana, ua uhi wale mākou i ka 1/3 o ke ala i hoʻomākaukau ʻia e kēia ʻatikala, akā mai hoʻonāwaliwali, no ka mea i kēia manawa e maʻalahi.

No laila, pono mākou i kahi luna e hōʻiliʻili i ka ʻike meta a hoʻokomo i loko o kahi palapala hōʻiliʻili:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

No ka mea e hoʻoponopono kēia luna i ka ʻike e pili ana i kahi palekana kikoʻī, pono mākou e hōʻike i ka ticker (hōʻailona) o kēia palekana ma ka leka. No kēia kumu i faus aia Records - nā papa e haʻi ana i ka hoʻolālā memo ma ke kumuhana ʻelele.

Ma keia hihia, e hele kakou i records.py a wehewehe i ke ʻano o ka memo no kēia kumuhana:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

E like me kāu i manaʻo ai, hoʻohana ʻo faust i ka python type annotation e wehewehe i ka schema memo, ʻo ia ke kumu i kākoʻo ʻia ai ka mana liʻiliʻi e ka waihona. 3.6.

E hoʻi kāua i ka luna, hoʻonohonoho i nā ʻano a hoʻohui:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

E like me kāu e ʻike ai, hāʻawi mākou i kahi ʻāpana hou me kahi hoʻolālā i ke ʻano kumu hoʻomaka kumuhana - value_type. Eia kekahi, pili nā mea a pau i ka hoʻolālā like, no laila ʻaʻole wau i ʻike i kahi mea e noho ai i kekahi mea ʻē aʻe.

ʻAe, ʻo ka paʻi hope loa e hoʻohui i kahi kelepona i ka ʻikepili hōʻiliʻili ʻike meta e collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Hoʻohana mākou i ka papahana i hoʻolaha mua ʻia no ka memo. I kēia hihia, ua hoʻohana au i ke ʻano .cast no ka mea ʻaʻole pono mākou e kali i ka hopena mai ka luna, akā pono e haʻi aku i kēlā. ala e hoʻouna i kahi leka i ke kumuhana:

  1. hoʻolei - ʻaʻole pāpā ʻia no ka mea ʻaʻole manaʻo i kahi hopena. ʻAʻole hiki iā ʻoe ke hoʻouna i ka hopena i kekahi kumuhana ʻē aʻe ma ke ʻano he memo.

  2. hoʻouna - ʻaʻole ia e ālai no ka mea ʻaʻole ia e manaʻo i kahi hopena. Hiki iā ʻoe ke kuhikuhi i kahi ʻelele ma ke kumuhana e hele ai ka hopena.

  3. nīnau - kali i kahi hopena. Hiki iā ʻoe ke kuhikuhi i kahi ʻelele ma ke kumuhana e hele ai ka hopena.

No laila, ʻo ia wale nō me nā ʻelele no kēia lā!

Ka Hui moe

ʻO ka mea hope aʻu i hoʻohiki ai e kākau ma kēia ʻāpana he kauoha. E like me ka mea i ʻōlelo ʻia ma mua, ʻo nā kauoha ma fau he mea hoʻopili a puni ke kaomi. ʻO ka ʻoiaʻiʻo, hoʻopili wale ʻo faust i kā mākou kauoha maʻamau i kāna interface ke kuhikuhi ʻana i ke kī -A

Ma hope o nā ʻelele i hoʻolaha ʻia ma agents.py hoʻohui i kahi hana me ka mea hoʻonani app.commandkahea ana i ke ano hoʻolei у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

No laila, inā mākou e kāhea i ka papa inoa o nā kauoha, aia kā mākou kauoha hou i loko:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Hiki iā mākou ke hoʻohana e like me nā mea ʻē aʻe, no laila e hoʻomaka hou i ka mea hana faust a hoʻomaka i kahi hōʻiliʻili piha o nā palekana:

> faust -A horton.agents start-collect-securities

He aha ka hope?

Ma ka ʻaoʻao aʻe, me ka hoʻohana ʻana i nā mea i koe e like me ka laʻana, e noʻonoʻo mākou i ke ʻano o ka sink no ka ʻimi ʻana i nā mea koʻikoʻi i nā kumukūʻai pani o ke kālepa no ka makahiki a me ka hoʻomaka ʻana o ka cron o nā mea hana.

ʻO ia wale nō i kēia lā! Mahalo no ka heluhelu ʻana :)

Code no kēia ʻāpana

Nā hana hope ma Faust, Māhele II: Nā Agena a me nā Pūʻulu

PS Ma lalo o ka ʻāpana hope ua nīnau ʻia au e pili ana i ka faust and confluent kafka (he aha nā hiʻohiʻona i loaʻa i ka confluent?). Me he mea lā ʻoi aku ka hana o ka confluent ma nā ʻano he nui, akā ʻo ka ʻoiaʻiʻo ʻaʻole i loaʻa i ka faust ke kākoʻo piha o ka mea kūʻai aku no ka confluent - eia kēia mai ka nā wehewehe ʻana o nā kaohi o nā mea kūʻai aku ma ka doc.

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka