Así, así, la segunda parte. Como está escrito anteriormente, en él haremos lo siguiente:
Escribamos un pequeño cliente para alphavantage en aiohttp con solicitudes para los puntos finales que necesitamos.
Creemos un agente que recopilará datos sobre valores y metainformación sobre ellos.
Pero esto es lo que haremos para el proyecto en sí y, en términos de la investigación de Faust, aprenderemos cómo escribir agentes que procesen eventos de flujo desde Kafka, así como también cómo escribir comandos (haga clic en el contenedor), en nuestro caso: para mensajes push manuales al tema que el agente está monitoreando.
Formación
Cliente AlphaVantage
Primero, escribamos un pequeño cliente aiohttp para solicitudes a alphavantage.
La API AlphaVantage tiene un diseño bastante simple y hermoso, por lo que decidí realizar todas las solicitudes a través del método construct_query donde a su vez hay una llamada http.
Traigo todos los campos a snake_case Por comodidad.
Bueno, la decoración logger.catch para un resultado de rastreo hermoso e informativo.
PD: No olvide agregar el token alphavantage localmente a config.yml o exportar la variable de entorno HORTON_SERVICE_APIKEY. Recibimos una ficha aquí.
clase CRUD
Tendremos una colección de valores para almacenar metainformación sobre valores.
Por ahora tendremos la creación de aplicaciones más sencilla, un poco más adelante la ampliaremos, sin embargo, para no hacerte esperar, aquí referencias a clase de aplicación. También te aconsejo que eches un vistazo a la clase de configuración, ya que es responsable de la mayoría de las configuraciones.
principal
Agente para recopilar y mantener una lista de valores.
Entonces, primero obtenemos el objeto de aplicación Faust; es bastante simple. A continuación, declaramos explícitamente un tema para nuestro agente... Aquí vale la pena mencionar qué es, cuál es el parámetro interno y cómo se puede organizar de manera diferente.
Temas en kafka, si queremos saber la definición exacta, es mejor leer apagado. documento, o puedes leer abstracto en Habré en ruso, donde todo también se refleja con bastante precisión :)
Parámetro interno, descrito bastante bien en el documento de faust, nos permite configurar el tema directamente en el código, por supuesto, esto significa los parámetros proporcionados por los desarrolladores de faust, por ejemplo: retención, política de retención (de forma predeterminada, eliminar, pero puede configurar compacto), número de particiones por tema (puntajeshacer, por ejemplo, menos de importancia global aplicaciones fausto).
En general, el agente puede crear un tema administrado con valores globales, sin embargo, a mí me gusta declararlo todo explícitamente. Además, algunos parámetros (por ejemplo, el número de particiones o la política de retención) del tema en el anuncio del agente no se pueden configurar.
Así es como se vería sin definir manualmente el tema:
Bueno, ahora describamos lo que hará nuestro agente :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Entonces, al comienzo del agente, abrimos una sesión aiohttp para solicitudes a través de nuestro cliente. Por lo tanto, al iniciar un trabajador, cuando se inicia nuestro agente, inmediatamente se abrirá una sesión: una, durante todo el tiempo que el trabajador esté en ejecución (o varias, si cambia el parámetro concurrencia de un agente con una unidad predeterminada).
A continuación, seguimos el stream (colocamos el mensaje en _, ya que a nosotros, en este agente, no nos importa el contenido) de los mensajes de nuestro tema, si existen en el desplazamiento actual, de lo contrario nuestro ciclo esperará su llegada. Bueno, dentro de nuestro bucle, registramos la recepción del mensaje, obtenemos una lista de valores activos (get_securities devuelve solo activo de forma predeterminada, consulte el código del cliente) y la guardamos en la base de datos, verificando si hay un valor con el mismo ticker y intercambio en la base de datos, si lo hay, entonces (el documento) simplemente se actualizará.
¡Lanzamos nuestra creación!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Funciones de PS componente web No consideraré a Fausto en los artículos, por eso colocamos la bandera adecuada.
En nuestro comando de inicio, le dijimos a Fausto dónde buscar el objeto de la aplicación y qué hacer con él (iniciar un trabajador) con el nivel de salida del registro de información. Obtenemos el siguiente resultado:
Veamos el conjunto de particiones. Como podemos ver, se creó un tema con el nombre que designamos en el código, el número de particiones por defecto (8, tomado de particiones_tema - parámetro del objeto de la aplicación), ya que no especificamos un valor individual para nuestro tema (a través de particiones). Al agente iniciado en el trabajador se le asignan las 8 particiones, ya que es la única, pero esto se discutirá con más detalle en la parte sobre agrupación.
Bueno, ahora podemos ir a otra ventana de terminal y enviar un mensaje vacío a nuestro tema:
PD usando @ Mostramos que estamos enviando un mensaje a un tema llamado "collect_securities".
En este caso, el mensaje fue a la partición 6; puede verificar esto yendo a kafdrop en localhost:9000
Al ir a la ventana de la terminal con nuestro trabajador, veremos un mensaje feliz enviado usando loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
También podemos buscar en mongo (usando Robo3T o Studio3T) y ver que los valores están en la base de datos:
No soy multimillonario y, por lo tanto, nos contentamos con la primera opción de visualización.
Felicidad y alegría: el primer agente está listo :)
Agente listo, ¡viva el nuevo agente!
Sí señores, solo hemos recorrido 1/3 del camino preparado por este artículo, pero no se desanimen, porque ahora será más fácil.
Ahora necesitamos un agente que recopile metainformación y la coloque en un documento de recopilación:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Dado que este agente procesará información sobre un valor específico, debemos indicar el ticker (símbolo) de este valor en el mensaje. Para ello en fausto hay Archivos — clases que declaran el esquema de mensaje en el tema del agente.
En este caso, vayamos a registros.pyy describa cómo debería verse el mensaje para este tema:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Como habrás adivinado, faust usa la anotación de tipo python para describir el esquema del mensaje, razón por la cual la versión mínima admitida por la biblioteca es 3.6.
Volvamos al agente, configuremos los tipos y agréguelo:
Como puede ver, pasamos un nuevo parámetro con un esquema al método de inicialización del tema: value_type. Además, todo sigue el mismo esquema, por lo que no veo ningún sentido a detenerme en nada más.
Bueno, el toque final es agregar una llamada al agente de recopilación de metainformación para recopilar_seguridades:
Usamos el esquema previamente anunciado para el mensaje. En este caso utilicé el método .cast ya que no necesitamos esperar el resultado del agente, pero vale la pena mencionar que formas de enviar un mensaje al tema:
cast: no bloquea porque no espera un resultado. No puedes enviar el resultado a otro tema como mensaje.
enviar: no bloquea porque no espera un resultado. Puede especificar un agente en el tema al que irá el resultado.
preguntar - espera un resultado. Puede especificar un agente en el tema al que irá el resultado.
¡Eso es todo con los agentes por hoy!
El equipo de ensueño
Lo último que prometí escribir en esta parte son comandos. Como se mencionó anteriormente, los comandos en fausto son una envoltura alrededor del clic. De hecho, fausto simplemente adjunta nuestro comando personalizado a su interfaz al especificar la clave -A
Después de los agentes anunciados en agentes.py agregar una función con un decorador aplicación.comandollamando al método emitir у recoger_valores:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Así, si llamamos a la lista de comandos, nuestro nuevo comando estará en ella:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Podemos usarlo como cualquier otra persona, así que reiniciemos el trabajador Faust y comencemos una colección completa de valores:
> faust -A horton.agents start-collect-securities
¿Qué pasará después?
En la siguiente parte, tomando como ejemplo el resto de agentes, consideraremos el mecanismo sumidero para buscar extremos en los precios de cierre de las operaciones del año y el lanzamiento cron de agentes.