Background tasks on Faust, Part II: Agents and Teams

Background tasks on Faust, Part II: Agents and Teams

Table of contents

  1. Part I: Introduction

  2. Part II: Agents and Teams

What are we doing here?

So, so, the second part. As it was written earlier, in it we will do the following:

  1. Let's write a small client for alphavantage on aiohttp with requests for the endpoints we need.

  2. Let's make an agent that will collect data on securities and meta information on them.

But, this is what we will do for the project itself, and in the faust research plan, we will learn how to write agents that process the stream of events from kafka, as well as how to write commands (wrapper on click), in our case - for manually pushing a message to the topic followed by the agent.

Prepare

AlphaVantage Client

First, let's write a small aiohttp client for alphavantage requests.

alphavantage.py

Spoiler

import urllib.parse as urlparse
from io import StringIO
from typing import Any, Dict, List, Union

import aiohttp
import pandas as pd
import stringcase
from loguru import logger

from horton.config import API_ENDPOINT


class AlphaVantageClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        api_endpoint: str = API_ENDPOINT,
    ):
        self._query_params = {"datatype": "json", "apikey": api_key}
        self._api_endpoint = api_endpoint
        self._session = session

    @logger.catch
    def _format_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
        formatted_data = {}

        for field, item in data.items():
            formatted_data[stringcase.snakecase(field)] = item

        return formatted_data

    @logger.catch
    async def _construct_query(
        self, function: str, to_json: bool = True, **kwargs
    ) -> Union[Dict[str, Any], str]:
        path = "query/"

        async with self._session.get(
            urlparse.urljoin(self._api_endpoint, path),
            params={"function": function, **kwargs, **self._query_params},
        ) as response:
            data = (await response.json()) if to_json else (await response.text())

            if to_json:
                data = self._format_fields(data)

        return data

    @logger.catch
    async def get_securities(self, state: str = "active") -> List[Dict[str, str]]:
        data = await self._construct_query("LISTING_STATUS", state=state, to_json=False)

        data = pd.read_csv(StringIO(data))

        securities = data.to_dict("records")

        for index, security in enumerate(securities):
            security = self._format_fields(security)
            security["_type"] = "physical"

            securities[index] = security

        return securities

    @logger.catch
    async def get_security_overview(self, symbol: str) -> Dict[str, str]:
        return await self._construct_query("OVERVIEW", symbol=symbol)

    @logger.catch
    async def get_historical_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query(
            "TIME_SERIES_DAILY_ADJUSTED", symbol=symbol, outputsize="full"
        )

    @logger.catch
    async def get_last_price_data(self, symbol: str) -> Dict[str, Any]:
        return await self._construct_query("GLOBAL_QUOTE", symbol=symbol)

    @logger.catch
    async def get_indicator_data(
        self, symbol: str, indicator: str, **indicator_options
    ) -> Dict[str, Any]:
        return await self._construct_query(
            indicator, symbol=symbol, **indicator_options
        )

Actually, everything is clear on it:

  1. The AlphaVantage API is quite simple and beautifully designed, so I decided to pass all requests through the method construct_query where in turn comes the http call.

  2. All fields I bring to snake_case for convenience.

  3. Well, the logger.catch decoration for a beautiful and informative traceback output.

PS Do not forget to locally add the alphavantage token to config.yml, or export the environment variable HORTON_SERVICE_APIKEY. Getting a token here.

CRUD class

We will have a collection of securities to store meta information about securities.

database/security.py

In my opinion, there is no need to explain anything here, and the base class itself is quite simple.

get_app()

Let's add a function to create an application object in app.py

Spoiler

import faust

from horton.config import KAFKA_BROKERS


def get_app():
    return faust.App("horton", broker=KAFKA_BROKERS)

While we will have the simplest creation of the application, we will expand it a little later, however, in order not to keep you waiting, here references to the App class. I also advise you to take a look at the settings class, since it is he who is responsible for most of the settings.

Main part

Agent for collecting and maintaining the list of securities

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

So, first we get the faust application object - this is quite simple. Next, we explicitly declare a topic for our agent ... Here it is worth mentioning what it is, what the internal parameter is and how it can be arranged differently.

  1. Topics in kafka, if we want to know the exact definition, it is better to read off. dockor you can read abstract on Habré in Russian, where everything is also accurately reflected 🙂

  2. Parameter internal, well described in the faust doc, allows us to set up a topic directly in the code, of course, we mean the parameters provided by the faust developers, for example: retention, retention policy (delete by default, but you can also set compact), number of partitions per topic (partitionsto do, for example, less than global value faust applications).

  3. In general, an agent can create itself a managed topic with global values, however, I like to declare everything explicitly. In addition, some parameters (for example, the number of partitions or retention policy) of a topic in an agent declaration cannot be configured.

    Here's what it might look like without manually defining the topic:

app = get_app()

@app.agent()
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
	pass

Well, now, let's describe what our agent will do 🙂

app = get_app()

collect_securities_topic = app.topic("collect_securities", internal=True)

@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for _ in stream:
            logger.info("Start collect securities")

            client = AlphaVantageClient(session, API_KEY)

            securities = await client.get_securities()

            for security in securities:
                await SecurityCRUD.update_one(
                    {"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
                )

            yield True

So, at the beginning of the agent, we open an aiohttp session for requests through our client. Thus, when starting the worker, when our agent is launched, a session will immediately be opened - one, for the entire time the worker is running (or several, if you change the parameter concurrency from an agent with a default one).

Next, we go along the stream (we put the message in _, since we, in this agent, do not care about the content) of messages from our topic, if they are present at the current offset, otherwise, our loop will wait for them to arrive. Well, inside our loop, we log the arrival of a message, get a list of active (get_securities returns only active by default, see client code) securities and save it to the database, while checking if there is a security with such a ticker and exchange in the database , if there is, then it (the paper) will simply be updated.

Let's start our creation!

> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info

PS Features web component I will not consider faust in the articles, so we set the appropriate flag.

In our launch command, we told faust where to look for the application object and what to do with it (start the worker) with the info logging level. We get the following output:

Spoiler

┌ƒaµS† v1.10.4┬───────────────────────────────────────────────────┐
│ id          │ horton                                            │
│ transport   │ [URL('kafka://localhost:9092')]                   │
│ store       │ memory:                                           │
│ log         │ -stderr- (info)                                   │
│ pid         │ 1271262                                           │
│ hostname    │ host-name                                         │
│ platform    │ CPython 3.8.2 (Linux x86_64)                      │
│ drivers     │                                                   │
│   transport │ aiokafka=1.1.6                                    │
│   web       │ aiohttp=3.6.2                                     │
│ datadir     │ /path/to/project/horton-data                      │
│ appdir      │ /path/to/project/horton-data/v1                   │
└─────────────┴───────────────────────────────────────────────────┘
... логи, логи, логи ...

┌Topic Partition Set─────────┬────────────┐
│ topic                      │ partitions │
├────────────────────────────┼────────────┤
│ collect_securities         │ {0-7}      │
│ horton-__assignor-__leader │ {0}        │
└────────────────────────────┴────────────┘ 

It's alive!!!

Let's look at the partition set. As we can see, a topic was created with the name that we indicated in the code, the default number of partitions (8, taken from topic_partitions - parameter of the application object), since we did not specify an individual value for our topic (through partitions). The running agent in the worker has all 8 partitions, since it is the only one, but this will be discussed in more detail in the part about clustering.

Well, now we can go to another terminal window and send an empty message to our topic:

> faust -A horton.agents send @collect_securities
{"topic": "collect_securities", "partition": 6, "topic_partition": ["collect_securities", 6], "offset": 0, "timestamp": ..., "timestamp_type": 0}

PS using @ we show that we are sending a message to a topic named "collect_securities".

In this case, the message went to the 6th partition - this can be checked by going to kafdrop on localhost:9000

Going to the terminal window with our worker, we will see a happy message sent using loguru:

2020-09-23 00:26:37.304 | INFO     | horton.agents:collect_securities:40 - Start collect securities

Also, we can look into mongo (using Robo3T or Studio3T) and see what securities are in the database:

I am not a billionaire, and therefore, we are content with the first viewing option.

Background tasks on Faust, Part II: Agents and TeamsBackground tasks on Faust, Part II: Agents and Teams

Happiness and joy - the first agent is ready 🙂

The agent is ready, long live the new agent!

Yes, gentlemen, we have covered only 1/3 of the path prepared by this article, but do not be discouraged, because now it will be easier.

So, now we need an agent that collects meta information and puts it into a collection document:

collect_security_overview_topic = app.topic("collect_security_overview", internal=True)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[?],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            ...

Since this agent will process information about a specific security, we need to indicate the ticker (symbol) of this security in the message. For this, faust has Records — classes declaring the message scheme in the agent's topic.

In that case, let's go to records.py and describe how the message for this topic should look like:

import faust


class CollectSecurityOverview(faust.Record):
    symbol: str
    exchange: str

As you might have guessed, faust uses python's type annotation to describe the message schema, so the minimum version supported by the library is 3.6.

Let's go back to the agent, set the types and add it:

collect_security_overview_topic = app.topic(
    "collect_security_overview", internal=True, value_type=CollectSecurityOverview
)


@app.agent(collect_security_overview_topic)
async def collect_security_overview(
    stream: StreamT[CollectSecurityOverview],
) -> AsyncIterable[bool]:
    async with aiohttp.ClientSession() as session:
        async for event in stream:
            logger.info(
                "Start collect security [{symbol}] overview", symbol=event.symbol
            )

            client = AlphaVantageClient(session, API_KEY)

            security_overview = await client.get_security_overview(event.symbol)

            await SecurityCRUD.update_one({"symbol": event.symbol, "exchange": event.exchange}, security_overview)

            yield True

As you can see, we are passing a new parameter with the schema — value_type — to the topic initialization method. Further, everything is according to the same scheme, so I don’t see the point in dwelling on something else.

Well, the final touch - let's add a call to the collect_securitites agent for collecting meta information:

....
for security in securities:
    await SecurityCRUD.update_one({
            "symbol": security["symbol"],
            "exchange": security["exchange"]
        },
        security,
        upsert = True,
    )

    await collect_security_overview.cast(
        CollectSecurityOverview(symbol = security["symbol"], exchange = security["exchange"])
    )
....

We use the schema declaration for the message earlier. In this case, I've used the .cast method since we don't have to wait for the result from the agent, but it's worth mentioning that ways of send a message to a topic:

  1. cast - does not block, because it does not expect a result. You cannot send the result to another topic as a message.

  2. send - does not block, because it does not wait for a result. You can specify an agent in whose topic the result will go.

  3. ask - waits for a result. You can specify an agent in whose topic the result will go.

So, that's it with agents for today!

The dream Team

The last thing I promised to write in this part is commands. As mentioned earlier, commands in faust are a wrapper over click. In fact, faust simply attaches our custom command to its interface when specifying the -A switch

After the announced agents in agents.py add a function with a decorator app.command, which calls the method cast у collect_securitites:

@app.command()
async def start_collect_securities():
    """Collect securities and overview."""

    await collect_securities.cast()

Thus, if we call the list of commands, our new command will be in it:

> faust -A horton.agents --help

....
Commands:
  agents                    List agents.
  clean-versions            Delete old version directories.
  completion                Output shell completion to be evaluated by the...
  livecheck                 Manage LiveCheck instances.
  model                     Show model detail.
  models                    List all available models as a tabulated list.
  reset                     Delete local table state.
  send                      Send message to agent/topic.
  start-collect-securities  Collect securities and overview.
  tables                    List available tables.
  worker                    Start worker instance for given app.

We can use it like any other, so let's restart the faust worker and start a full-fledged collection of securities:

> faust -A horton.agents start-collect-securities

What will happen next?

In the next part, using the example of the remaining agents, we will consider the sink mechanism for searching for an extremum in closing prices for the year and the cron-launch of agents.

That's all for today! Thanks for reading 🙂

Code for this part

Background tasks on Faust, Part II: Agents and Teams

PS Under the last part, I was asked about faust and confluent kafka (what features does confluent have). It seems that confluent is much more functional, but the fact is that faust does not have full client support for confluent - this follows from descriptions of client restrictions in the dock.

Source: habr.com

Add a comment