Zadania wprowadzające dotyczące Fausta, część I: Wprowadzenie

Zadania wprowadzające dotyczące Fausta, część I: Wprowadzenie

Jak to się stało, że tak żyję?

Niedawno musiałem pracować na backendzie mocno obciążonego projektu, w którym konieczne było zorganizowanie regularnej realizacji dużej liczby zadań w tle ze złożonymi obliczeniami i żądaniami usług stron trzecich. Projekt jest asynchroniczny i zanim przyszedłem, miał prosty mechanizm uruchamiania zadań cron: pętla sprawdzająca aktualny czas i uruchamiająca grupy współprogramów poprzez zbieranie - to podejście okazało się akceptowalne, dopóki nie było dziesiątek i setek takich współprogramów jednak gdy ich liczba przekroczyła dwa tysiące, musiałem pomyśleć o zorganizowaniu normalnej kolejki zadań z brokerem, kilkoma pracownikami i tak dalej.

Najpierw zdecydowałam się wypróbować Seler, którego używałam już wcześniej. Ze względu na asynchroniczny charakter projektu zagłębiłem się w pytanie i zobaczyłem статьюjak również projekt, stworzony przez autora artykułu.

Powiem tak, projekt jest bardzo ciekawy i sprawdza się całkiem nieźle w innych aplikacjach naszego zespołu, a sam autor twierdzi, że udało mu się go wdrożyć do produkcji, korzystając z puli asynchronicznej. Ale niestety, jak się okazało, nie do końca mi to odpowiadało problem z grupowym uruchomieniem zadań (patrz. grupa). W momencie pisania problem jest już zamknięty, jednak prace trwają już od miesiąca. W każdym razie życzę powodzenia autorowi i wszystkiego najlepszego, skoro w bibliotece już działają rzeczy... ogólnie rzecz biorąc, chodzi o mnie, a narzędzie okazało się dla mnie wilgotne. Dodatkowo niektóre zadania miały 2-3 żądania http do różnych usług, więc nawet przy optymalizacji zadań tworzymy 4 tysiące połączeń TCP mniej więcej co 2 godziny - niezbyt dobrze... Chciałbym utworzyć sesję dla jednego typu zadanie przy uruchamianiu pracowników. Trochę więcej o dużej liczbie żądań za pośrednictwem aiohttp tutaj.

Pod tym kątem zacząłem szukać alternatywy i znalazłem! Twórcy selera, konkretnie, jak ja to rozumiem Zapytaj Solema, powstał Faust, pierwotnie dla projektu Robinhood. Faust inspiruje się Kafką Streams i współpracuje z Kafką jako brokerem, rocksdb służy również do przechowywania wyników z pracy agentów, a najważniejsze jest to, że biblioteka jest asynchroniczna.

Możesz też popatrzeć szybkie porównanie seler i faust od twórców tych ostatnich: ich różnice, różnice między brokerami, realizacja elementarnego zadania. Wszystko jest dość proste, jednak uwagę zwraca fajna funkcja w fauście - wpisane dane do przesłania do tematu.

Co robimy?

Dlatego w krótkiej serii artykułów pokażę, jak zbierać dane z zadań w tle za pomocą Fausta. Źródłem naszego przykładowego projektu będzie, jak sama nazwa wskazuje, alphavantage.co. Pokażę jak pisać agenty (sink, tematy, partycje), jak wykonywać regularne (cron) wykonywanie, najwygodniejsze polecenia faust cli (wrapper over click), proste klastrowanie, a na koniec dołączymy datadog ( działa od razu po wyjęciu z pudełka) i spróbuj coś zobaczyć. Do przechowywania zebranych danych użyjemy mongodb i silnika do połączenia.

PS Sądząc po pewności, z jaką napisano punkt o monitoringu, myślę, że czytelnik na końcu ostatniego artykułu nadal będzie wyglądał mniej więcej tak:

Zadania wprowadzające dotyczące Fausta, część I: Wprowadzenie

Wymagania projektu

W związku z tym, co już obiecałem, zróbmy małą listę tego, co usługa powinna potrafić:

  1. Przesyłaj papiery wartościowe i ich przegląd (w tym zyski i straty, bilans, przepływy pieniężne - za ostatni rok) - regularnie
  2. Przesyłaj dane historyczne (dla każdego roku handlowego znajdź ekstremalne wartości ceny zamknięcia transakcji) - regularnie
  3. Regularnie przesyłaj najnowsze dane handlowe
  4. Regularnie przesyłaj spersonalizowaną listę wskaźników dla każdego zabezpieczenia

Zgodnie z oczekiwaniami od podstaw wybieramy nazwę dla projektu: Horton

Przygotowujemy infrastrukturę

Tytuł z pewnością mocny, wystarczy jednak napisać małą konfigurację dla docker-compose z kafką (i zookeeperem - w jednym kontenerze), kafdropem (jeśli chcemy przeglądać wiadomości w tematach), mongodb. Dostajemy [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) w następującej formie:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

Nie ma tu wcale nic skomplikowanego. Dla kafki zadeklarowano dwóch słuchaczy: jednego (wewnętrznego) do użytku wewnątrz sieci kompozytowej i drugiego (zewnętrznego) do żądań z zewnątrz, więc przekazali go na zewnątrz. 2181 — port zookeepera. Reszta wydaje mi się jasna.

Przygotowanie szkieletu projektu

W wersji podstawowej struktura naszego projektu powinna wyglądać następująco:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*Wszystko, co zaobserwowałem Jeszcze tego nie dotykamy, po prostu tworzymy puste pliki.**

Stworzyliśmy strukturę. Teraz dodajmy niezbędne zależności, napiszmy konfigurację i połączmy się z mongodb. Nie będę podawać pełnego tekstu plików w artykule, aby nie opóźniać, ale podam linki do niezbędnych wersji.

Zacznijmy od zależności i meta na temat projektu - pyproject.toml

Następnie zaczynamy instalować zależności i tworzyć virtualenv (można też samemu utworzyć folder venv i aktywować środowisko):

pip3 install poetry (если ещё не установлено)
poetry install

Teraz stwórzmy plik konfiguracyjny.yml - Poświadczenia i gdzie zapukać. Można tam od razu umieścić dane dla alphavantage. Cóż, przejdźmy dalej config.py — wyodrębnij dane dla aplikacji z naszej konfiguracji. Tak, przyznaję, korzystałem z mojej biblioteki - sitri.

Po podłączeniu do Mongo wszystko jest dość proste. ogłoszony klasa klienta połączyć i klasa bazowa dla crudów, aby ułatwić zadawanie zapytań o kolekcje.

Co się później stanie?

Artykuł nie jest zbyt długi, bo tutaj mówię tylko o motywacji i przygotowaniu, więc nie miejcie mi tego za złe – obiecuję, że w następnej części będzie akcja i grafika.

Zatem w następnej części:

  1. Napiszmy małego klienta dla alphavantage na aiohttp z żądaniami dotyczącymi potrzebnych nam punktów końcowych.
  2. Stwórzmy agenta, który będzie zbierał dla nich dane o papierach wartościowych i cenach historycznych.

Kod projektu

Kod dla tej części

Źródło: www.habr.com

Dodaj komentarz