Նախապատմական առաջադրանքներ Ֆաուստի վերաբերյալ, Մաս II. Գործակալներ և թիմեր

Նախապատմական առաջադրանքներ Ֆաուստի վերաբերյալ, Մաս II. Գործակալներ և թիմեր

Պահեստավորված նյութեր

  1. Մաս I. Ներածություն

  2. Մաս II. Գործակալներ և թիմեր

Ի՞նչ ենք մենք այստեղ անում։

Այսպիսով, այսպես, երկրորդ մասը: Ինչպես ավելի վաղ գրվել է, դրանում մենք կանենք հետևյալը.

  1. Եկեք գրենք փոքր հաճախորդ alphavantage-ի համար aiohttp-ում՝ մեզ անհրաժեշտ վերջնակետերի հարցումներով:

  2. Եկեք ստեղծենք գործակալ, ով կհավաքի տվյալներ արժեթղթերի և դրանց վերաբերյալ մետա տեղեկատվություն:

Բայց սա այն է, ինչ մենք կանենք հենց նախագծի համար, և ֆաուստ հետազոտության առումով մենք կսովորենք, թե ինչպես գրել գործակալներ, որոնք մշակում են իրադարձությունները կաֆկայից, ինչպես նաև ինչպես գրել հրամաններ (սեղմեք փաթաթան), մեր դեպքում՝ ձեռքով հրում հաղորդագրությունների համար այն թեմային, որը գործակալը վերահսկում է:

Ուսուցում

AlphaVantage հաճախորդ

Նախ, եկեք գրենք փոքրիկ aiohttp հաճախորդ alphavantage-ի հարցումների համար:

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Փաստորեն, դրանից ամեն ինչ պարզ է.

  1. AlphaVantage API-ն բավականին պարզ և գեղեցիկ ձևավորված է, ուստի ես որոշեցի կատարել բոլոր հարցումները մեթոդի միջոցով construct_query որտեղ իր հերթին կա http զանգ.

  2. Ես բերում եմ բոլոր դաշտերը snake_case հարմարավետության համար:

  3. Դե, logger.catch ձևավորումը գեղեցիկ և տեղեկատվական հետքի արդյունքի համար:

Հ.Գ. Չմոռանաք ավելացնել alphavantage նշանը տեղում config.yml-ում կամ արտահանել շրջակա միջավայրի փոփոխականը: HORTON_SERVICE_APIKEY. Մենք ստանում ենք նշան այստեղ.

CRUD դաս

Մենք կունենանք արժեթղթերի հավաքածու՝ արժեթղթերի մասին մետա տեղեկատվությունը պահելու համար:

տվյալների բազա/security.py

Իմ կարծիքով այստեղ որևէ բան բացատրելու կարիք չկա, իսկ բազային դասն ինքնին բավականին պարզ է։

get_app ()

Եկեք հավելվածի օբյեկտ ստեղծելու գործառույթ ավելացնենք App.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

Առայժմ մենք կունենանք ամենապարզ հավելվածի ստեղծումը, մի փոքր ուշ այն կընդլայնենք, սակայն ձեզ սպասեցնելու համար այստեղ. հղումներ դեպի App-class: Ես նաև խորհուրդ եմ տալիս դիտել կարգավորումների դասը, քանի որ այն պատասխանատու է պարամետրերի մեծ մասի համար:

Հիմնական մասը

Արժեթղթերի ցուցակի հավաքագրման և պահպանման գործակալ

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Այսպիսով, նախ մենք ստանում ենք faust հավելվածի օբյեկտը, դա բավականին պարզ է: Հաջորդը, մենք հստակորեն հայտարարում ենք թեմա մեր գործակալի համար... Այստեղ արժե նշել, թե ինչ է դա, որն է ներքին պարամետրը և ինչպես դա կարող է տարբեր կերպ դասավորվել:

  1. Թեմաներ կաֆկայում, եթե ուզում ենք իմանալ ճշգրիտ սահմանումը, ավելի լավ է կարդալ անջատված է. փաստաթուղթ, կամ կարող եք կարդալ ամփոփագիր ռուսերեն Habré-ի վրա, որտեղ նույնպես ամեն ինչ բավականին ճշգրիտ է արտացոլված :)

  2. Ներքին պարամետր, որը բավականին լավ նկարագրված է faust doc-ում, թույլ է տալիս մեզ կարգավորել թեման ուղղակիորեն կոդի մեջ, իհարկե, սա նշանակում է faust ծրագրավորողների կողմից տրամադրված պարամետրերը, օրինակ՝ պահպանում, պահպանման քաղաքականություն (կանխադրված ջնջել, բայց կարող եք սահմանել կուռ), բաժինների քանակը մեկ թեմայի համար (միավորներըանել, օրինակ, ավելի քիչ, քան համաշխարհային նշանակություն դիմումները faust):

  3. Ընդհանուր առմամբ, գործակալը կարող է ստեղծել գլոբալ արժեքներով կառավարվող թեմա, այնուամենայնիվ, ես սիրում եմ ամեն ինչ հստակորեն հայտարարել: Բացի այդ, գործակալի գովազդի թեմայի որոշ պարամետրեր (օրինակ՝ բաժանումների քանակը կամ պահպանման քաղաքականությունը) չեն կարող կազմաձևվել:

    Ահա թե ինչ կարող է լինել այն առանց թեման ձեռքով սահմանելու.

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Դե, հիմա եկեք նկարագրենք, թե ինչ է անելու մեր գործակալը :)

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

Այսպիսով, գործակալի սկզբում մենք բացում ենք aiohttp նիստը մեր հաճախորդի միջոցով հարցումների համար: Այսպիսով, աշխատող սկսելիս, երբ մեր գործակալը գործարկվի, անմիջապես կբացվի նիստ՝ մեկ՝ աշխատողի աշխատանքի ողջ ընթացքում (կամ մի քանիսը, եթե փոխեք պարամետրը զուգահեռություն լռելյայն միավոր ունեցող գործակալից):

Հաջորդը, մենք հետևում ենք հոսքին (մենք տեղադրում ենք հաղորդագրությունը _, քանի որ մենք՝ այս գործակալում, թքած ունենք մեր թեմայի հաղորդագրությունների բովանդակության վրա, եթե դրանք գոյություն ունեն ընթացիկ օֆսեթում, հակառակ դեպքում մեր ցիկլը կսպասի դրանց ժամանմանը: Դե, մեր օղակի ներսում մենք գրանցում ենք հաղորդագրության անդորրագիրը, ստանում ենք ակտիվ (get_securities վերադարձնում է միայն լռելյայն ակտիվ, տես հաճախորդի կոդը) արժեթղթերի ցուցակը և պահում ենք այն տվյալների բազայում՝ ստուգելով, թե արդյոք կա նույն նշանով արժեթուղթ և փոխանակում տվյալների բազայում, եթե կա, ապա այն (թուղթը) պարզապես կթարմացվի:

Եկեք գործարկենք մեր ստեղծագործությունը:

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS-ի առանձնահատկությունները վեբ բաղադրիչ Հոդվածներում ֆաուստը չեմ համարի, ուստի մենք համապատասխան դրոշ ենք սահմանել։

Մեր գործարկման հրամանում մենք Faust-ին ասացինք, թե որտեղ փնտրել հավելվածի օբյեկտը և ինչ անել դրա հետ (գործարկել աշխատող) տեղեկամատյանի ելքի մակարդակով: Մենք ստանում ենք հետևյալ արդյունքը.

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

Այն ողջ է!!!

Եկեք նայենք բաժանման հավաքածուին: Ինչպես տեսնում ենք, ստեղծվել է մի թեմա, որի անունը մենք նշել ենք կոդում, բաժանումների լռելյայն թիվը (8, վերցված է թեմա_բաժանումներ - հավելվածի օբյեկտի պարամետր), քանի որ մենք չենք նշել անհատական ​​արժեք մեր թեմայի համար (բաժանումների միջոցով): Աշխատողում գործարկված գործակալին նշանակված են բոլոր 8 միջնորմները, քանի որ այն միակն է, բայց դա ավելի մանրամասն կքննարկվի կլաստերավորման մասին մասում։

Դե, հիմա մենք կարող ենք գնալ մեկ այլ տերմինալի պատուհան և դատարկ հաղորդագրություն ուղարկել մեր թեմային.

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS օգտագործելով @ մենք ցույց ենք տալիս, որ մենք հաղորդագրություն ենք ուղարկում «հավաքել_արժեթղթեր» անունով թեմային:

Այս դեպքում հաղորդագրությունը գնաց բաժին 6. կարող եք ստուգել սա՝ անցնելով kafdrop on localhost:9000

Անցնելով տերմինալի պատուհանը մեր աշխատողի հետ, մենք կտեսնենք ուրախ հաղորդագրություն, որը ուղարկվում է loguru-ի միջոցով.

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Մենք կարող ենք նաև նայել mongo-ին (օգտագործելով Robo3T կամ Studio3T) և տեսնել, որ արժեթղթերը գտնվում են տվյալների բազայում.

Ես միլիարդատեր չեմ, և հետևաբար մենք բավարարվում ենք դիտման առաջին տարբերակով։

Նախապատմական առաջադրանքներ Ֆաուստի վերաբերյալ, Մաս II. Գործակալներ և թիմերՆախապատմական առաջադրանքներ Ֆաուստի վերաբերյալ, Մաս II. Գործակալներ և թիմեր

Երջանկություն և ուրախություն - առաջին գործակալը պատրաստ է :)

Գործակալը պատրաստ է, կեցցե նոր գործակալը:

Այո՛, պարոնայք, մենք այս հոդվածով պատրաստած ճանապարհի միայն 1/3-ն ենք անցել, բայց մի հուսահատվեք, քանի որ հիմա ավելի հեշտ կլինի։

Այսպիսով, այժմ մեզ անհրաժեշտ է գործակալ, որը հավաքում է մետա տեղեկատվություն և տեղադրում այն ​​հավաքածուի փաստաթղթում.

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Քանի որ այս գործակալը կմշակի որոշակի անվտանգության մասին տեղեկատվությունը, մենք պետք է հաղորդագրության մեջ նշենք այս անվտանգության նշանը (խորհրդանիշը): Այս նպատակով ֆաուստում կան Պահեստավորված նյութեր — դասեր, որոնք հայտարարում են հաղորդագրության սխեման գործակալի թեմայում:

Այս դեպքում, եկեք գնանք records.py և նկարագրեք, թե ինչպիսին պետք է լինի այս թեմայի հաղորդագրությունը.

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

Ինչպես կարող էիք կռահել, Ֆաուստը օգտագործում է python տեսակի անոտացիա՝ հաղորդագրության սխեման նկարագրելու համար, ինչի պատճառով գրադարանի կողմից աջակցվող նվազագույն տարբերակը. 3.6.

Եկեք վերադառնանք գործակալին, սահմանենք տեսակները և ավելացնենք այն.

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

Ինչպես տեսնում եք, մենք սխեմայով նոր պարամետր ենք փոխանցում թեմայի սկզբնավորման մեթոդին՝ value_type: Ավելին, ամեն ինչ հետևում է նույն սխեմային, ուստի ես որևէ այլ բանի վրա կանգ առնելու իմաստ չեմ տեսնում:

Դե, վերջին հպումը մետա տեղեկատվության հավաքագրման գործակալին զանգ ավելացնելն է՝ collect_securitites:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

Հաղորդագրության համար օգտագործում ենք նախկինում հայտարարված սխեման։ Այս դեպքում ես օգտագործեցի .cast մեթոդը, քանի որ պետք չէ սպասել գործակալի արդյունքին, բայց հարկ է նշել, որ. ուղիները հաղորդագրություն ուղարկել թեմային.

  1. cast - չի արգելափակում, քանի որ արդյունք չի ակնկալում: Դուք չեք կարող արդյունքը ուղարկել այլ թեմայի որպես հաղորդագրություն:

  2. ուղարկել - չի արգելափակում, քանի որ արդյունք չի ակնկալում: Թեմայում կարող եք նշել գործակալ, որին կգնա արդյունքը:

  3. հարցնել - սպասում է արդյունքի: Թեմայում կարող եք նշել գործակալ, որին կգնա արդյունքը:

Այսպիսով, այս ամենն այսօր գործակալների հետ է:

Երազանքի թիմը

Վերջին բանը, որ խոստացել եմ գրել այս մասում, հրամաններն են։ Ինչպես նշվեց ավելի վաղ, faust-ի հրամանները սեղմման շուրջ են: Փաստորեն, faust-ը պարզապես կցում է մեր հատուկ հրամանն իր ինտերֆեյսին, երբ նշում է -A ստեղնը

Հայտարարված գործակալների ներսից հետո agents.py ավելացրեք գործառույթ դեկորատորի միջոցով app.commandկոչելով մեթոդը դերաբաշխում у հավաքել_արժեթղթեր:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Այսպիսով, եթե մենք կանչենք հրամանների ցանկը, մեր նոր հրամանը կլինի դրանում.

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

Մենք կարող ենք օգտագործել այն բոլորի նման, ուստի եկեք վերագործարկենք faust worker-ը և սկսենք արժեթղթերի ամբողջական հավաքածու.

> faust -A horton.agents start-collect-securities

Ինչ է լինելու հաջորդը.

Հաջորդ մասում, օգտագործելով մնացած գործակալները որպես օրինակ, մենք կքննարկենք տարվա ընթացքում առևտրի փակման գներում ծայրահեղությունների որոնման մեխանիզմը և գործակալների բաց թողարկումը:

Այսքանն է այսօրվա համար: Շնորհակալություն կարդալու համար :)

Կոդ այս մասի համար

Նախապատմական առաջադրանքներ Ֆաուստի վերաբերյալ, Մաս II. Գործակալներ և թիմեր

Հ.Գ.Վերջին մասում ինձ հարցրեցին ֆաուստի և միաձուլվող կաֆկայի մասին (ի՞նչ առանձնահատկություններ ունի կոնֆլենտը:) Թվում է, որ կոնֆլյուենտը շատ առումներով ավելի ֆունկցիոնալ է, բայց փաստն այն է, որ ֆաուստը չունի հոլովակի ամբողջական սպասարկում. սա հետևում է. հաճախորդի սահմանափակումների նկարագրությունները փաստաթղթում.

Source: www.habr.com

Добавить комментарий