Vela → cache inteligente para séries temporais e muito mais

Nas fintech, muitas vezes temos que processar volumes enormes de dados sobre taxas de câmbio. Obtemos dados de diferentes fontes, e cada uma delas tem sua própria ideia de como extrapolar as taxas de câmbio para amanhã, depois de amanhã, para o próximo mês e até para os próximos três anos. Se ao menos alguém pudesse prever as taxas corretamente, seria hora de fechar o negócio e simplesmente trocar dinheiro estupidamente. Algumas fontes são mais confiáveis, outras fornecem lixo completo, com raras inclusões de valores quase corretos, mas para casais exóticos. Nosso trabalho é examinar essas dezenas de milhares de valores por segundo e determinar exatamente o que mostrar aos clientes. Precisamos filtrar o valor correto de toneladas de sujeira e lodo, assim como os flamingos fazem no almoço.

Vela → cache inteligente para séries temporais e muito mais

Uma característica distintiva especial dos flamingos é seu enorme bico curvado para baixo, com o qual filtram os alimentos da água ou da lama.
 - Wiki

Assim nasceu a biblioteca Vela, que armazena um cache de estado para vários valores em intervalos de tempo especificados. Nos bastidores, ele filtra dados ruins e desatualizados instantaneamente e também fornece acesso aos mais recentes N valores validados para cada chave (pares de moedas, no nosso caso).

Digamos que coletamos taxas para três pares de moedas. Definição mais simples Vela para armazenar o estado atual, será parecido com isto:

defmodule Pairs do
  use Vela,
    eurusd: [sorter: &Kernel.<=/2],
    eurgbp: [limit: 3, errors: 1],
    eurcad: [validator: Pairs]

  @behaviour Vela.Validator

  @impl Vela.Validator
  def valid?(:eurcad, rate), do: rate > 0
end

Atualizando Valores

Vela.put/3 A função fará o seguinte em sequência:

  • vai causar validator no valor, se algum estiver definido (ver capítulo Validação abaixo);
  • irá adicionar o valor à linha de bons valores se a validação for bem-sucedida ou à linha de serviço :__errors__ de outra forma;
  • causará classificação se sorter definido para uma determinada chave ou simplesmente colocará o valor no topo da lista (LIFO, consulte o capítulo Сортировка abaixo);
  • irá cortar a linha de acordo com o parâmetro :limit passado na criação;
  • retornará a estrutura atualizada Vela.

iex|1 > pairs = %Pairs{}
iex|2 > Vela.put(pairs, :eurcad, 1.0)
#⇒ %Pairs{..., eurcad: [1.0], ...}
iex|3 > Vela.put(pairs, :eurcad, -1.0)
#⇒ %Pairs{__errors__: [eurcad: -1.0], ...}
iex|4 > pairs |> Vela.put(:eurusd, 2.0) |> Vela.put(:eurusd, 1.0)
#⇒ %Pairs{... eurusd: [1.0, 2.0]}

Também Vela implementos Access, para que você possa usar qualquer uma das funções padrão para atualização profunda de estruturas do arsenal para atualizar valores Kernel: Kernel.get_in/2, Kernel.put_in/3, Kernel.update_in/3, Kernel.pop_in/2 e Kernel.get_and_update_in/3.

Validação

Um validador pode ser definido como:

  • função externa com um argumento (&MyMod.my_fun/1), receberá apenas o valor para validação;
  • função externa com dois argumentos, &MyMod.my_fun/2, ela vai ganhar um par serie, value para validação;
  • implementação de módulo Vela.Validator;
  • parâmetro de configuração thresholde - opcionalmente - compare_by, consulte o capítulo Comparação abaixo.

Se a validação for bem-sucedida, o valor será adicionado à lista sob a chave correspondente; caso contrário, a tupla {serie, value} enviado :__errors_.

Comparação

Os valores armazenados nessas linhas podem ser qualquer coisa. Ensinar Vela para compará-los, é necessário transferir compare_by parâmetro na definição da série (a menos que os valores não possam ser comparados com o padrão Kernel.</2); este parâmetro deve ser do tipo (Vela.value() -> number()). Por padrão é simples & &1.

Além disso, você pode passar um parâmetro para a definição de linha comparator para calcular valores delta (min/max); por exemplo, transmitindo Date.diff/2 como comparador, você pode obter os deltas corretos para datas.

Outra maneira conveniente de trabalhar é passar um parâmetro threshold, que define a proporção máxima permitida do novo valor para {min, max} intervalo. Como é especificado como uma porcentagem, o cheque não utiliza comparatormas ainda usa compare_by. Por exemplo, para especificar um valor limite para datas e horas, você deve especificar compare_by: &DateTime.to_unix/1 (para obter um valor inteiro) e threshold: 1, fazendo com que novos valores sejam permitidos apenas se estiverem em ±band intervalo dos valores atuais.

Finalmente, você pode usar Vela.equal?/2 para comparar dois caches. Se os valores definem uma função equal?/2 ou compare/2, então essas funções serão usadas para comparação, caso contrário, usaremos estupidamente ==/2.

Obtendo valores

O processamento do estado atual geralmente começa com a chamada Vela.purge/1, que remove valores obsoletos (se validator amarrado à timestamps). Você pode então ligar Vela.slice/1que retornará keyword com nomes de linhas como chaves e os primeiros valores reais.

Você também pode usar get_in/2/pop_in/2 para acesso de baixo nível aos valores em cada linha.

Aplicação

Vela pode ser extremamente útil como cache de série temporal em um estado de processo como GenServer/Agent. Queremos nunca usar valores de curso obsoletos e, para fazer isso, simplesmente mantemos o processo com o estado processado Vela, com o validador mostrado abaixo.

@impl Vela.Validator
def valid?(_key, %Rate{} = rate),
  do: Rate.age(rate) < @death_age

и Vela.purge/1 remove silenciosamente todos os valores obsoletos sempre que precisamos dos dados. Para acessar os valores reais, simplesmente chamamos Vela.slice/1, e quando for necessário um pequeno histórico do curso (a série inteira), simplesmente o retornamos - já ordenado - com valores validados.

Feliz cache de série temporal!

Fonte: habr.com

Adicionar um comentário