Vela → розумний кеш для time series і не тільки

У фінтеху нам часто доводиться обробляти досить потужні обсяги даних курсів обміну валют. Ми отримуємо дані з різних джерел, і кожен із них має власне уявлення про те, як екстраполювати значення курсів на завтра, післязавтра, наступний місяць і навіть наступні три роки. Якби хтось умів передбачати курси правильно, можна було б закривати бізнес і просто тупо змінювати гроші туди-сюди. Деякі джерела мають більшу довіру, деякі постачають суцільно сміття, з рідкісними вкрапленнями майже правильних значень, зате для екзотичних пар. Наша робота полягає в тому, щоб просіяти ці десятки тисяч значень за секунду та визначити, що саме показати замовникам. Нам потрібно відфільтрувати єдине правильне значення з тонни бруду та мулу, як це роблять фламінго на обіді.

Vela → розумний кеш для time series і не тільки

Особливою відмітною ознакою фламінго є масивний вигнутий вниз дзьоб, за допомогою якого вони фільтрують їжу з води або мулу.
 - Вікі

Так народилася бібліотека Velaяка зберігає кеш стану для декількох значень у заданих часових інтервалах. Під капотом вона на льоту відсіває погані та застарілі дані, а також надає доступ до останніх. N значенням, що пройшли валідацію, для кожного ключа (пари валют, у нашому випадку).

Допустимо, ми збираємо курси для трьох пар валют. Найпростіше визначення Vela для зберігання актуального стану виглядатиме якось так:

defmodule Pairs do
  use Vela,
    eurusd: [sorter: &Kernel.<=/2],
    eurgbp: [limit: 3, errors: 1],
    eurcad: [validator: Pairs]

  @behaviour Vela.Validator

  @impl Vela.Validator
  def valid?(:eurcad, rate), do: rate > 0
end

Оновлення значень

Vela.put/3 функція послідовно зробить таке:

  • викличе validator на значенні, якщо такий визначений (див. розділ Валідація нижче);
  • додасть значення або до ряду хороших значень, якщо валідація закінчилася успішно, або до службового ряду :__errors__ в зворотньому випадку;
  • викличе сортування якщо sorter визначений для даного ключа, або просто покладе значення в голову списку (LIFO, див главку Сортування нижче);
  • обріже ряд відповідно до параметра :limit переданому під час створення;
  • поверне оновлену структуру Vela.

iex|1 > pairs = %Pairs{}
iex|2 > Vela.put(pairs, :eurcad, 1.0)
#⇒ %Pairs{..., eurcad: [1.0], ...}
iex|3 > Vela.put(pairs, :eurcad, -1.0)
#⇒ %Pairs{__errors__: [eurcad: -1.0], ...}
iex|4 > pairs |> Vela.put(:eurusd, 2.0) |> Vela.put(:eurusd, 1.0)
#⇒ %Pairs{... eurusd: [1.0, 2.0]}

Також Vela імплементує Access, так що можна для оновлення значень скористатися будь-якою із стандартних функцій для глибокого оновлення структур з арсеналу Kernel: Kernel.get_in/2, Kernel.put_in/3, Kernel.update_in/3, Kernel.pop_in/2 та Kernel.get_and_update_in/3.

Валідація

Валідатор може бути визначений як:

  • зовнішня функція з одним аргументом (&MyMod.my_fun/1), вона отримає лише значення для валідації;
  • зовнішня функція з двома аргументами, &MyMod.my_fun/2, вона отримає пару serie, value для валідації;
  • модуль, що імплементує Vela.Validator;
  • конфігураційний параметр threshold, і - опціонально - compare_by, див главку порівняння нижче.

Якщо валідація пройшла успішно, значення додається до списку під відповідним ключем, інакше кортеж {serie, value} відправляється в :__errors_.

Порівняння

Значення, що зберігаються у цих рядах, можуть бути будь-якими. Щоб навчити Vela їх порівнювати, необхідно передати compare_by параметр визначення ряду (якщо тільки значення не можуть бути порівняні стандартним Kernel.</2); цей параметр повинен мати тип (Vela.value() -> number()). За промовчанням це просто & &1.

Також, для визначення ряду можна передати параметр comparator для обчислення значень дельт (min/max); наприклад, передаючи Date.diff/2 як компаратор, можна отримати правильні дельти для дат.

Іншим зручним способом роботи є передача параметра threshold, який визначає максимально допустиме відношення нового значення до {min, max} інтервалу. Оскільки він заданий у відсотках, перевірка не використовує comparator, але все ще використовує compare_by. Наприклад, щоб вказати граничне значення для часу дат, необхідно вказати compare_by: &DateTime.to_unix/1 (для отримання цілого чисельного значення) та threshold: 1, в результаті чого нові значення будуть дозволені, тільки якщо вони знаходяться в ±band інтервалі від поточних значень.

Зрештою, можна використовувати Vela.equal?/2 для порівняння двох кешів. Якщо значення визначають функцію equal?/2 або compare/2, то ці функції будуть використані для порівняння, в іншому випадку ми тупо використовуємо ==/2.

Набуття значень

Обробка поточного стану зазвичай починається з виклику Vela.purge/1, який прибирає застарілі значення (якщо validator зав'язаний на timestamps). Потім можна викликати Vela.slice/1, яка поверне keyword з іменами рядів як ключі та першими, актуальними значеннями.

Також можна скористатися get_in/2/pop_in/2 для низькорівневого доступу до значень у кожному ряді.

додаток

Vela може виявитися надзвичайно корисною як кеш тимчасових рядів у стейті процесу типу GenServer/Agent. Ми хочемо ніколи не використовувати застарілі значення курсів, і для цього ми просто тримаємо процес зі станом, що обробляється Velaз валідатором, показаним нижче.

@impl Vela.Validator
def valid?(_key, %Rate{} = rate),
  do: Rate.age(rate) < @death_age

и Vela.purge/1 спокійно видаляє всі застарілі значення щоразу, коли нам потрібні дані. Для доступу до актуальних значень ми просто викликаємо Vela.slice/1, а коли потрібна невелика історія з курсу (весь ряд цілком), ми просто повертаємо його — вже відсортованим — із провалідованими значеннями.

Вдалого кешування часових рядів!

Джерело: habr.com

Додати коментар або відгук