በFaust ላይ የበስተጀርባ ተግባራት፣ ክፍል II፡ ወኪሎች እና ቡድኖች

በFaust ላይ የበስተጀርባ ተግባራት፣ ክፍል II፡ ወኪሎች እና ቡድኖች

ማውጫ

  1. ክፍል አንድ፡ መግቢያ

  2. ክፍል II: ወኪሎች እና ቡድኖች

እዚህ ምን እየሰራን ነው?

ስለዚህ, ሁለተኛው ክፍል. ቀደም ሲል እንደተፃፈው, በውስጡ የሚከተሉትን እናደርጋለን.

  1. የምንፈልጋቸውን የመጨረሻ ነጥቦችን በመጠየቅ በaiohttp ላይ ለፊደል የሚሆን ትንሽ ደንበኛን እንፃፍ።

  2. በእነሱ ላይ የደህንነት እና የሜታ መረጃን የሚሰበስብ ወኪል እንፍጠር።

ግን ይህ እኛ ለፕሮጀክቱ ራሱ የምናደርገው ነው ፣ እና ከተሳሳተ ምርምር አንፃር ፣ ክስተቶችን ከካፍ የሚያስተላልፉ ወኪሎችን እንዴት እንደሚጽፉ ፣ እንዲሁም ትዕዛዞችን እንዴት እንደሚጽፉ (ጠቅታ መጠቅለያ) ፣ በእኛ ሁኔታ - እንማራለን ። ወኪሉ ወደሚከታተለው ርዕስ በእጅ የሚገፋ መልእክት።

ዝግጅት

የአልፋቫንቴጅ ደንበኛ

በመጀመሪያ፣ ለፊደል ጥያቄዎች ትንሽ የaiohttp ደንበኛ እንፃፍ።

አልፋቫንቴጅ.py

አጣሚ

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

በእውነቱ ፣ ሁሉም ነገር ከእሱ ግልፅ ነው-

  1. የአልፋቫንቴጅ ኤፒአይ ቀላል እና በሚያምር ሁኔታ የተነደፈ ነው፣ ስለዚህ ሁሉንም ጥያቄዎች በዘዴ ለማቅረብ ወሰንኩ። construct_query በተራው ደግሞ http ጥሪ አለ.

  2. ሁሉንም መስኮች አመጣለሁ snake_case ለምቾት።

  3. እንግዲህ፣ logger.catch ጌጥ ለቆንጆ እና መረጃ ሰጪ የመከታተያ ውጤት።

PS ወደ config.yml የሆሄያት ቶከንን በአገር ውስጥ ማከል ወይም የአካባቢ ተለዋዋጭ ወደ ውጭ መላክን አይርሱ HORTON_SERVICE_APIKEY. ማስመሰያ እንቀበላለን እዚህ.

CRUD ክፍል

ስለ ደህንነቶች ሜታ መረጃን ለማከማቸት የዋስትናዎች ስብስብ ይኖረናል።

የውሂብ ጎታ/security.py

በእኔ አስተያየት, እዚህ ምንም ነገር ማብራራት አያስፈልግም, እና የመሠረት ክፍሉ ራሱ በጣም ቀላል ነው.

አግኙ()

ውስጥ የመተግበሪያ ነገር ለመፍጠር ተግባር እንጨምር መተግበሪያ.ፒ

አጣሚ

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

ለአሁኑ በጣም ቀላል የሆነውን የመተግበሪያ ፈጠራ ይኖረናል ፣ ትንሽ ቆይተን እናሰፋዋለን ፣ ግን እርስዎን እየጠበቁ እንዳንቆይ ፣ እዚህ ማጣቀሻዎች ወደ መተግበሪያ-ክፍል. ለአብዛኛዎቹ መቼቶች ተጠያቂ ስለሆነ የቅንጅቶችን ክፍል እንድትመለከቱ እመክራችኋለሁ።

ዋናው ክፍል

የዋስትናዎች ዝርዝርን ለመሰብሰብ እና ለማቆየት ወኪል

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ስለዚህ ፣ በመጀመሪያ ፈጣን መተግበሪያን እናገኛለን - በጣም ቀላል ነው። በመቀጠል, ለተወካያችን አንድ ርዕስ በግልፅ እናውጃለን ... እዚህ ምን እንደሆነ, የውስጥ መለኪያው ምን እንደሆነ እና ይህ በተለየ መንገድ እንዴት እንደሚስተካከል መጥቀስ ተገቢ ነው.

  1. በካፍካ ውስጥ ያሉ ርዕሶች, ትክክለኛውን ፍቺ ማወቅ ከፈለግን ማንበብ የተሻለ ነው ጠፍቷል ሰነድ, ወይም ማንበብ ይችላሉ ማጠቃለያ ሁሉም ነገር በትክክል በሚንጸባረቅበት በሩሲያኛ ሀበሬ ላይ :)

  2. መለኪያ ውስጣዊ, በፋስት ዶክ ውስጥ በደንብ የተገለፀው, ርዕሱን በቀጥታ በኮዱ ውስጥ እንድናዋቅር ያስችለናል, በእርግጥ ይህ ማለት በፋስት ገንቢዎች የቀረቡትን መለኪያዎች ለምሳሌ ማቆየት, ማቆየት ፖሊሲ (በነባሪ ሰርዝ, ነገር ግን ማቀናበር ይችላሉ). የተጠጋጋበአንድ ርዕስ ክፍልፋዮች ብዛት (ክፋዮችማድረግ, ለምሳሌ, ያነሰ ዓለም አቀፋዊ ጠቀሜታ መተግበሪያዎች ፈጣን)።

  3. በአጠቃላይ ተወካዩ በአለምአቀፍ እሴቶች የሚተዳደር ርዕስ መፍጠር ይችላል, ሆኖም ግን, ሁሉንም ነገር በግልፅ ማወጅ እፈልጋለሁ. በተጨማሪም፣ በወኪል ማስታወቂያ ውስጥ ያለው ርዕስ አንዳንድ መለኪያዎች (ለምሳሌ፣ የክፍሎች ብዛት ወይም የማቆያ ፖሊሲ) ሊዋቀሩ አይችሉም።

    ርዕሱን በእጅ ሳይገልጹ ምን ሊመስል እንደሚችል እነሆ፡-

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

ደህና፣ አሁን ወኪላችን ምን እንደሚያደርግ እንገልፃለን :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

ስለዚህ፣ በወኪሉ መጀመሪያ ላይ፣ በደንበኛችን በኩል ለሚቀርቡ ጥያቄዎች የ aiohttp ክፍለ ጊዜ እንከፍተዋለን። ስለዚህ ሰራተኛ ሲጀመር ወኪላችን ሲጀመር አንድ ክፍለ ጊዜ ወዲያውኑ ይከፈታል - አንድ ፣ ሰራተኛው በሚሰራበት ጊዜ ሁሉ (ወይም ብዙ ፣ መለኪያውን ከቀየሩ) ተመሳሳይነት ነባሪ ክፍል ካለው ወኪል)።

በመቀጠል ዥረቱን እንከተላለን (መልእክቱን እናስቀምጣለን _እኛ በዚህ ወኪል ውስጥ ስለ ይዘቱ ግድ ስለሌለው) ከርዕሰ ጉዳያችን የሚመጡ መልዕክቶች አሁን ባለው ማካካሻ ላይ ካሉ ፣ አለበለዚያ ዑደታችን እስኪደርሱ ድረስ ይጠብቃል። ደህና፣ በእኛ ሉፕ ውስጥ፣ የመልእክቱን ደረሰኝ ውስጥ እናስገባለን፣ የገባሪዎችን ዝርዝር አግኝተናል (get_securities በነባሪ ገቢር ብቻ ነው፣ የደንበኛ ኮድን ይመልከቱ) ደህንነቶች እና ወደ ዳታቤዝ እናስቀምጠዋለን፣ ተመሳሳይ ምልክት ያለው ደህንነት መኖሩን እናረጋግጣለን። በመረጃ ቋቱ ውስጥ መለዋወጥ , ካለ, ከዚያም (ወረቀቱ) በቀላሉ ይዘምናል.

ፈጠራችንን እንጀምር!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS ባህሪያት የድር አካል በአንቀጾቹ ውስጥ መጥፎ ግምት ውስጥ አልገባም, ስለዚህ ተገቢውን ባንዲራ አዘጋጅተናል.

በእኛ የማስጀመሪያ ትእዛዝ፣ የመተግበሪያውን ነገር የት መፈለግ እንዳለብን እና ምን ማድረግ እንዳለብን (ሰራተኛ ማስጀመር) በመረጃ ምዝግብ ማስታወሻ ውፅዓት ደረጃ ነግረናል። የሚከተለውን ውጤት እናገኛለን:

አጣሚ

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

ህያው ነው!!!

የክፍልፋይ ስብስብን እንመልከት. እንደምናየው፣ በኮዱ ውስጥ የመረጥነውን ስም፣ ነባሪ የክፍሎች ቁጥር ያለው ርዕስ ተፈጠረ (8፣ ከ የተወሰደ ርዕስ_ክፍልፋዮች ለርዕሰ ጉዳያችን (በክፍልፋዮች) የግለሰብ እሴትን ስላልገለፅን የመተግበሪያ ግቤት መለኪያ በሠራተኛው ውስጥ የጀመረው ወኪል እሱ ብቻ ስለሆነ ሁሉም 8 ክፍሎች ተመድበዋል ፣ ግን ይህ ስለ ክላስተር በክፍል ውስጥ በዝርዝር ይብራራል።

ደህና፣ አሁን ወደ ሌላ ተርሚናል መስኮት ሄደን ባዶ መልእክት ወደ ርዕሳችን መላክ እንችላለን፡-

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

ፒኤስ በመጠቀም @ "የስብስብ_ደህንነቶች" ወደተባለ ርዕስ መልእክት እየላክን መሆኑን እናሳያለን።

በዚህ አጋጣሚ መልእክቱ ወደ ክፍል 6 ሄዷል - ወደ kafdrop በመሄድ ይህንን ማረጋገጥ ይችላሉ localhost:9000

ከሰራተኞቻችን ጋር ወደ ተርሚናል መስኮት ስንሄድ ሎጉሩ በመጠቀም የተላከ አስደሳች መልእክት እናያለን፡-

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

እንዲሁም ወደ ሞንጎ (Robo3T ወይም Studio3T በመጠቀም) መመልከት እንችላለን እና ዋስትናዎቹ በመረጃ ቋቱ ውስጥ እንዳሉ ማየት እንችላለን፡

እኔ ቢሊየነር አይደለሁም, እና ስለዚህ በመጀመሪያው የመመልከቻ አማራጭ ረክተናል.

በFaust ላይ የበስተጀርባ ተግባራት፣ ክፍል II፡ ወኪሎች እና ቡድኖችበFaust ላይ የበስተጀርባ ተግባራት፣ ክፍል II፡ ወኪሎች እና ቡድኖች

ደስታ እና ደስታ - የመጀመሪያው ወኪል ዝግጁ ነው :)

ወኪል ተዘጋጅቷል፣ አዲሱ ወኪል ይድረስ!

አዎን, ክቡራን, በዚህ ጽሑፍ የተዘጋጀውን መንገድ 1/3 ብቻ ሸፍነናል, ነገር ግን ተስፋ አትቁረጥ, ምክንያቱም አሁን ቀላል ይሆናል.

ስለዚህ አሁን ሜታ መረጃን የሚሰበስብ እና ወደ መሰብሰቢያ ሰነድ የሚያስቀምጥ ወኪል እንፈልጋለን፡-

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

ይህ ወኪል ስለ አንድ የተወሰነ ደህንነት መረጃ ስለሚያስኬድ፣ በመልእክቱ ውስጥ የዚህን ደህንነት ምልክት (ምልክት) መጠቆም አለብን። ለዚህ ዓላማ በፋስት ውስጥ አሉ መዛግብት - በተወካይ ርዕስ ውስጥ የመልእክቱን እቅድ የሚያውጁ ክፍሎች።

በዚህ ሁኔታ, ወደ እንሂድ መዝገቦች.py እና የዚህ ርዕስ መልእክት ምን መምሰል እንዳለበት ግለጽ።

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

እርስዎ እንደገመቱት ፋስት የመልእክቱን እቅድ ለመግለፅ የpython አይነት ማብራሪያን ይጠቀማል፣ ለዚህም ነው በቤተ-መጽሐፍት የሚደገፈው አነስተኛው ስሪት የሆነው። 3.6.

ወደ ወኪሉ እንመለስ፣ ዓይነቶችን እናዘጋጃለን እና እንጨምረው፡-

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

እንደሚመለከቱት ፣ አዲስ ግቤት ከእቅድ ጋር ወደ ርዕስ ማስጀመሪያ ዘዴ - value_type እናስተላልፋለን። በተጨማሪም, ሁሉም ነገር አንድ አይነት እቅድ ይከተላል, ስለዚህ በሌላ ነገር ላይ ለመኖር ምንም ፋይዳ አይታየኝም.

ደህና፣ የመጨረሻው ንክኪ ወደ ሜታ መረጃ መሰብሰቢያ ወኪል ጥሪን ወደ መሰብሰብ_ሴኩሪቲስ ማከል ነው።

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

ለመልእክቱ ቀደም ሲል የታወጀውን እቅድ እንጠቀማለን. በዚህ አጋጣሚ፣ ከተወካዩ የተገኘውን ውጤት መጠበቅ ስለማያስፈልገን የ .cast ዘዴን ተጠቀምኩ፣ ነገር ግን ያንን መጥቀስ ተገቢ ነው። መንገዶች ወደ ርዕሰ ጉዳዩ መልእክት ይላኩ፡-

  1. ውሰድ - ውጤትን ስለማይጠብቅ አያግድም. ውጤቱን እንደ መልእክት ወደ ሌላ ርዕስ መላክ አይችሉም።

  2. መላክ - ውጤቱን ስለማይጠብቅ አያግድም. ውጤቱ በሚሄድበት ርዕስ ውስጥ ወኪልን መግለጽ ይችላሉ.

  3. ይጠይቁ - ውጤቱን ይጠብቃል. ውጤቱ በሚሄድበት ርዕስ ውስጥ ወኪልን መግለጽ ይችላሉ.

ስለዚህ፣ ለዛሬ በወኪሎች ያ ብቻ ነው!

የህልም ቡድን

በዚህ ክፍል ለመጻፍ ቃል የገባሁት የመጨረሻው ነገር ትዕዛዞችን ነው። ቀደም ሲል እንደተገለፀው በፋስት ውስጥ ያሉ ትዕዛዞች በጠቅታ ዙሪያ መጠቅለያ ናቸው። እንደውም ፋስት የ -A ቁልፍን ሲገልፅ በቀላሉ ብጁ ትዕዛዛችንን ከበይነገጽ ጋር ያያይዘዋል

ከታወቁት ወኪሎች በኋላ ወኪሎች.py ከጌጣጌጥ ጋር አንድ ተግባር ይጨምሩ app.commandዘዴውን በመጥራት ቀረጠ у ደህንነቶችን ሰብስብ:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

ስለዚህ፣ የትዕዛዞቹን ዝርዝር ከጠራን፣ አዲሱ ትዕዛዛችን በውስጡ ይሆናል።

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

እንደማንኛውም ሰው ልንጠቀምበት እንችላለን፣ስለዚህ ፈጣን ሰራተኛውን እንደገና እናስጀምር እና የተሟላ የደህንነት ስብስብ እንጀምር፡-

> faust -A horton.agents start-collect-securities

ቀጥሎ ምን ይሆናል?

በሚቀጥለው ክፍል የቀሩትን ወኪሎች እንደ ምሳሌ በመጠቀም በዓመቱ የግብይት ዋጋ መዝጊያ ላይ ጽንፍ የመፈለግ ዘዴን እና የወኪሎችን ክሮን ማስጀመርን እንመለከታለን።

ለዛሬ ያ ብቻ ነው! ስላነበቡ እናመሰግናለን :)

ለዚህ ክፍል ኮድ

በFaust ላይ የበስተጀርባ ተግባራት፣ ክፍል II፡ ወኪሎች እና ቡድኖች

PS በመጨረሻው ክፍል ስር ስለ ፋስት እና ስለ ካፍካ ተጠየቅ (ድብልቅ ምን ዓይነት ገጽታዎች አሉት?). ኮንፍሉንት በብዙ መንገዶች የበለጠ የሚሰራ ይመስላል ፣ ግን እውነታው ግን ፋስት ለተደባለቀ የደንበኛ ድጋፍ የለውም - ይህ ከ በሰነዱ ውስጥ የደንበኛ ገደቦች መግለጫዎች.

ምንጭ: hab.com

አስተያየት ያክሉ