Apache NIFI - Короткий огляд можливостей на практиці

Запровадження

Так вийшло, що на моєму місці роботи мені довелося познайомитися з даною технологією. Почну з невеликої передісторії. На черговому мітингу, нашій команді сказали, що потрібно створити інтеграцію з відомою системою. Під інтеграцією малося на увазі, що ця відома система буде нам надсилати запити через HTTP на певний ендпоінт, а ми, як це не дивно, надсилати назад відповіді у вигляді SOAP повідомлення. Начебто все просто і тривіально. З цього випливає що потрібно.

Завдання

Створити 3 сервіси. Перший із них — Сервіс оновлення БД. Цей сервіс, при надходженні нових даних із сторонньої системи, оновлює дані в базі даних і генерує файл у форматі CSV, для передачі його в наступну систему. Викликається ендпоінт другого сервісу — Сервіс транспортування через FTP, який отримує переданий файл, валідує його, і кладе у файлове сховище через FTP. Третій сервіс - Сервіс передачі даних споживачеві, працює асинхронно з першими двома. Він приймає запит від сторонньої зовнішньої системи, отримання файлу про якому йшлося вище, бере готовий файл відповіді, модифікує його (оновлює поля id, description, linkToFile) і посилає у вигляді SOAP повідомлення. Те в цілому картина наступна: перші два сервіси починають свою роботу тільки тоді, коли прийшли дані для оновлення. Третій сервіс працює постійно, оскільки споживачів інформації багато, близько 1000 запитів на отримання даних за хвилину. Сервіси доступні постійно та їх інстанці розташовуються на різних оточеннях, таких як тест, демо, препрод та прод. Нижче представлено схему роботи цих сервісів. Відразу поясню, що деякі деталі спрощені, щоб уникнути зайвої складності.

Apache NIFI - Короткий огляд можливостей на практиці

Технічне поглиблення

При плануванні розв'язання задачі, спочатку вирішили зробити додатки на java з використанням Spring framework, балансувальником Nginx, базою даних Postgres та іншими технічними та не дуже штуками. Оскільки час на опрацювання технічного рішення дозволяв розглянути інші підходи розв'язання цього завдання, погляд впав на модну у певних колах технологію Apache NIFI. Відразу скажу, що ця технологія дозволила помітити нам ці три сервіси. У цій статті буде описано розробку сервісу транспортування файлу та сервісу передачі даних споживачеві, проте якщо стаття зайде, напишу про сервіс оновлення даних у БД.

Що це таке

NIFI являє собою розподілену архітектуру для швидкого паралельного завантаження та обробки даних, велику кількість плагінів для джерел та перетворень, версіонування конфігурацій та багато іншого. Приємним бонусом є те, що він дуже простий у використанні. Тривіальні процеси, такі як getFile, sendHttpRequest та інші можна уявити у вигляді квадратів. Кожен квадрат представляє процес, взаємодію якого можна побачити малюнку нижче. Більш детальна документація щодо взаємодії налаштування процесів написана тут для тих, кому російською — тут. У документації відмінно розписано як розпакувати та запустити NIFI, а так само, як створити процеси, вони ж квадрати
Ідея написати статтю народилася після тривалих пошуків та структурування отриманої інформації в щось усвідомлене, а також бажання трохи полегшити життя майбутнім розробниками.

Приклад

Розглянуто приклад того, як взаємодіють квадрати між собою. Загальна схема досить проста: Отримуємо HTTP запит (У теорії з файлом у тілі запиту. Для демонстрації можливостей NIFI, в даному прикладі запит стартує процес отримання файлу з локального ФГ), далі відсилаємо назад відповідь, що запит отриманий, паралельно запускається процес отримання файлу з ФГ і далі процес переміщення його через FTP ФХ. Варто пояснити, що процеси взаємодіють між собою у вигляді так званого flowFile. Це базова сутність у NIFI, яка зберігає в собі атрибути та вміст. Вміст - дані, які представлені файлом потоку. Т е грубо кажучи, якщо ви отримали файл з одного квадрата і передає його в інший, контентом буде ваш файл.

Apache NIFI - Короткий огляд можливостей на практиці

Як ви можете помітити, на цьому малюнку зображено загальний процес. HandleHttpRequest – приймає запити, ReplaceText – генерує тіло відповіді, HandleHttpResponse – віддає відповідь. FetchFile — отримує файл із файлового сховища, передає його квадрату PutSftp — кладе цей файл на FTP, за вказаною адресою. Тепер докладніше про цей процес.

В даному випадку – request усьому початок. Подивимося його параметри конфігурації.

Apache NIFI - Короткий огляд можливостей на практиці

Тут все досить тривіально за винятком StandartHttpContextMap — це якийсь сервіс, який дозволяє надсилати та приймати запити. Докладніше і навіть з прикладами можна подивитися. тут

Далі переглянемо параметри конфігурації ReplaceText квадрата. У ній варто звернути увагу на ReplacementValue - це те, що повернеться користувачеві як відповідь. У settings можна регулювати рівень логування, логи можна подивитися {куди розпакували nifi}/nifi-1.9.2/logs там є параметри failure/success — ґрунтуючись на ці параметри можна регулювати процес в цілому. Ті у разі успішної обробки тексту - викликається процес відправки відповіді користувачу, а в іншому випадку ми просто залогуємо неуспішний процес.

Apache NIFI - Короткий огляд можливостей на практиці

У властивостях HandleHttpResponse особливо нічого цікавого немає, крім статусу при успішному створенні відповіді.

Apache NIFI - Короткий огляд можливостей на практиці

З запитом на відповідь розібралися — перейдемо далі до отримання файлу та розміщенням його на FTP сервері. FetchFile - отримує файл за вказаним у налаштуваннях шляху і передає його в наступний процес.

Apache NIFI - Короткий огляд можливостей на практиці

І далі квадрат PutSftp - поміщає файл у файлове сховище. Параметри конфігурації можна побачити нижче.

Apache NIFI - Короткий огляд можливостей на практиці

Варто звернути увагу, що кожен квадрат — це окремий процес, який має бути запущений. Ми розглянули найпростіший приклад, який не вимагає якоїсь складної кастомізації. Далі розглянемо процес трохи складніше, де трохи попишемо на груві.

Більш складний приклад

Сервіс передачі споживачеві вийшов трохи складніше за рахунок процесу модифікації SOAP повідомлення. Загальний процес представлений малюнку нижче.

Apache NIFI - Короткий огляд можливостей на практиці

Тут ідея теж не надто складна: отримали запит від споживача, що йому потрібні дані, відправили відповідь, що отримали повідомлення, запустили процес отримання файлу відповіді, далі відредагували його з певною логікою, після чого передали файл споживачеві у вигляді SOAP повідомлення на сервер.

Думаю, не варто описувати наново ті квадрати, які ми бачили вище — перейдемо відразу до нових. Якщо вам необхідно редагувати якийсь файл і звичайні квадрати типу ReplaceText не підходять, вам доведеться писати свій скрипт. Зробити це можна з допомогою квадрата ExecuteGroogyScript. Налаштування його представлені нижче.

Apache NIFI - Короткий огляд можливостей на практиці

Є два варіанти завантаження скрипта у цей квадрат. Перший — це завантаження файлу зі скриптом. Другий - вставкою скрипта в scriptBody. Наскільки я знаю, квадрат executeScript підтримує кілька ЯП один з них groovy. Розчарую java розробників – на java не можна писати скрипти у таких квадратах. Для тих, кому дуже хочеться, потрібно створити свій кастомний квадрат і підкинути його в систему NIFI. Вся ця операція супроводжується досить тривалими танцями з бубном, якими ми не будемо займатися в рамках цієї статті. Я вибрала мову groovy. Нижче представлений тестовий скрипт, який просто інкрементно оновлює id в SOAP повідомленні. Важливо відмітити. Ви берете файл із flowFile оновлюєте його, не варто забувати, що його потрібно, оновлений, назад туди покласти. Також варто відзначити, що не всі бібліотеки підключені. Може вийти так, що вам таки доведеться імпортувати одну з либ. Мінусом ще є те, що скрипт у даному квадраті досить складно бити. Є спосіб підключитися до JVM NIFI та розпочати процес налагодження. Особисто я запускала у себе локальний додаток та імітувала отримання файлу із сесії. Налагодженням теж займалася локально. Помилки, які вилазять при завантаженні скрипту, досить легко гудатися і пишуться самим NIFI в балку.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Власне, на цьому кастомізація квадрата закінчується. Далі оновлений файл передається в квадрат, який займається посилкою файлу на сервер. Нижче наведено налаштування цього квадрата.

Apache NIFI - Короткий огляд можливостей на практиці

Описуємо метод, яким буде передаватися повідомлення SOAP. Пишемо куди. Далі потрібно зазначити, що це SOAP.

Apache NIFI - Короткий огляд можливостей на практиці

Додаємо декілька властивостей таких як хост та дія (soapAction). Зберігаємо, перевіряємо. Докладніше як посилати SOAP запити можна переглянути тут

Ми розглянули кілька варіантів використання процесів NIFI. Як вони взаємодіють та яка від них реальна користь. Розглянуті приклади є тестовими та трохи відрізняються від того, що реально на бою. Сподіваюся, ця стаття буде трохи корисною для розробників. Дякую за увагу. Якщо є якісь питання, пишіть. Постараюся відповісти.

Джерело: habr.com

Додати коментар або відгук