Vela → caché inteligente para series temporales y más

En fintech, a menudo tenemos que procesar volúmenes bastante grandes de datos sobre tipos de cambio de divisas. Obtenemos datos de diferentes fuentes, y cada una de ellas tiene su propia idea de cómo extrapolar los tipos de cambio para mañana, pasado mañana, el próximo mes e incluso los próximos tres años. Si alguien pudiera predecir las tasas correctamente, sería hora de cerrar el negocio y simplemente cambiar estúpidamente dinero de un lado a otro. Algunas fuentes son más fiables, otras proporcionan basura completa, con raras inclusiones de valores casi correctos, pero para parejas exóticas. Nuestro trabajo es examinar estas decenas de miles de valores por segundo y determinar qué mostrar exactamente a los clientes. Necesitamos filtrar el valor correcto entre toneladas de suciedad y limo, tal como lo hacen los flamencos en el almuerzo.

Vela → caché inteligente para series temporales y más

Una característica distintiva especial de los flamencos es su enorme pico curvado hacia abajo, con el que filtran los alimentos del agua o el barro.
 - Wiki

Así nació la biblioteca. Vela, que almacena un caché de estado para múltiples valores en intervalos de tiempo específicos. Debajo del capó, filtra datos incorrectos y obsoletos sobre la marcha y también proporciona acceso a las últimas novedades. N valores validados para cada clave (pares de divisas, en nuestro caso).

Digamos que recopilamos tasas para tres pares de divisas. Definición más simple Vela para almacenar el estado actual se verá así:

defmodule Pairs do
  use Vela,
    eurusd: [sorter: &Kernel.<=/2],
    eurgbp: [limit: 3, errors: 1],
    eurcad: [validator: Pairs]

  @behaviour Vela.Validator

  @impl Vela.Validator
  def valid?(:eurcad, rate), do: rate > 0
end

Actualizando valores

Vela.put/3 La función hará lo siguiente en secuencia:

  • causará validator en el valor, si hay alguno definido (ver capítulo Validación abajo);
  • agregará el valor a la fila de buenos valores si la validación fue exitosa, o a la fila de servicio :__errors__ de lo contrario;
  • causará clasificación si sorter definido para una clave determinada, o simplemente pondrá el valor al principio de la lista (LIFO, ver capítulo Сортировка abajo);
  • recortará la fila según el parámetro :limit transmitido a la creación;
  • devolverá la estructura actualizada Vela.

iex|1 > pairs = %Pairs{}
iex|2 > Vela.put(pairs, :eurcad, 1.0)
#⇒ %Pairs{..., eurcad: [1.0], ...}
iex|3 > Vela.put(pairs, :eurcad, -1.0)
#⇒ %Pairs{__errors__: [eurcad: -1.0], ...}
iex|4 > pairs |> Vela.put(:eurusd, 2.0) |> Vela.put(:eurusd, 1.0)
#⇒ %Pairs{... eurusd: [1.0, 2.0]}

también Vela implementos Access, por lo que puede utilizar cualquiera de las funciones estándar para la actualización profunda de estructuras del arsenal para actualizar valores Kernel: Kernel.get_in/2, Kernel.put_in/3, Kernel.update_in/3, Kernel.pop_in/2y Kernel.get_and_update_in/3.

Validación

Un validador se puede definir como:

  • función externa con un argumento (&MyMod.my_fun/1), solo recibirá el valor para validación;
  • función externa con dos argumentos, &MyMod.my_fun/2, ella conseguirá un par serie, value para validación;
  • implementación del módulo Vela.Validator;
  • parámetro de configuración threshold, y - opcionalmente - compare_by, ver capítulo Comparación abajo

Si la validación es exitosa, el valor se agrega a la lista bajo la clave correspondiente; de ​​lo contrario, la tupla {serie, value} enviado en :__errors_.

Comparación

Los valores almacenados en estas filas pueden ser cualquier cosa. Enseñar Vela para compararlos es necesario transferir compare_by parámetro en la definición de la serie (a menos que los valores no se puedan comparar con el estándar Kernel.</2); este parámetro debe ser de tipo (Vela.value() -> number()). Por defecto es simple & &1.

Además, puede pasar un parámetro a la definición de fila. comparator para calcular los valores delta (min/max); por ejemplo, transmitiendo Date.diff/2 como comparador, puede obtener los deltas correctos para las fechas.

Otra forma conveniente de trabajar es pasar un parámetro threshold, que define la relación máxima permitida entre el nuevo valor y {min, max} intervalo. Dado que se especifica como porcentaje, el cheque no utiliza comparatorpero todavía usa compare_by. Por ejemplo, para especificar un valor de umbral para fechas y horas, debe especificar compare_by: &DateTime.to_unix/1 (para obtener un valor entero) y threshold: 1, provocando que se permitan nuevos valores sólo si están en ±band intervalo de los valores actuales.

Finalmente, puedes usar Vela.equal?/2 para comparar dos cachés. Si los valores definen una función equal?/2 o compare/2, entonces estas funciones se usarán para comparar; de lo contrario, usaremos estúpidamente ==/2.

Obteniendo valores

El procesamiento del estado actual generalmente comienza con una llamada Vela.purge/1, que elimina valores obsoletos (si validator atado a timestamps). Luego puedes llamar Vela.slice/1que regresará keyword con nombres de filas como claves y los primeros valores reales.

También puedes usar get_in/2/pop_in/2 para acceso de bajo nivel a los valores de cada fila.

solicitud

Vela puede ser extremadamente útil como caché de series temporales en un estado de proceso como GenServer/Agent. Nunca queremos utilizar valores de curso obsoletos y, para ello, simplemente mantenemos el proceso con el estado procesado. Vela, con el validador que se muestra a continuación.

@impl Vela.Validator
def valid?(_key, %Rate{} = rate),
  do: Rate.age(rate) < @death_age

и Vela.purge/1 Elimina silenciosamente todos los valores obsoletos cada vez que necesitamos los datos. Para acceder a los valores reales simplemente llamamos Vela.slice/1, y cuando se requiere un pequeño historial del curso (la serie completa), simplemente lo devolvemos -ya ordenado- con valores validados.

¡Feliz almacenamiento en caché de series temporales!

Fuente: habr.com

Añadir un comentario