Apache NIFI - Stručný přehled příležitostí v praxi

úvod

Tak se stalo, že jsem se na svém současném působišti musel s touto technologií seznámit. Začnu malým pozadím. Na další schůzce bylo našemu týmu řečeno, že potřebujeme vytvořit integraci s známý systém. Integrací bylo myšleno, že tento známý systém nám bude posílat požadavky přes HTTP na konkrétní koncový bod a my, kupodivu, budeme posílat zpět odpovědi ve formě zprávy SOAP. Všechno se zdá jednoduché a triviální. Z toho vyplývá, že potřebujete...

Úkol

Vytvořte 3 služby. První z nich je Služba aktualizace databáze. Tato služba, když dorazí nová data ze systému třetí strany, aktualizuje data v databázi a vygeneruje soubor ve formátu CSV pro přenos do dalšího systému. Zavolá se koncový bod druhé služby – FTP Transport Service, která přijme přenesený soubor, ověří jej a uloží jej do úložiště souborů přes FTP. Třetí služba, služba přenosu dat pro spotřebitele, pracuje asynchronně s prvními dvěma. Přijme požadavek od externího systému třetí strany, aby obdržel výše uvedený soubor, vezme připravený soubor odpovědí, upraví jej (aktualizuje pole id, description, linkToFile) a odešle odpověď ve formě zprávy SOAP. To znamená, že celkový obrázek je následující: první dvě služby začnou pracovat, až když dorazí data pro aktualizaci. Třetí služba funguje neustále, protože existuje mnoho spotřebitelů informací, asi 1000 žádostí o data za minutu. Služby jsou dostupné neustále a jejich instance jsou umístěny v různých prostředích, jako jsou testovací, demo, předprodukční a prod. Níže je schéma toho, jak tyto služby fungují. Hned upřesním, že některé detaily byly zjednodušeny, aby se předešlo zbytečné složitosti.

Apache NIFI - Stručný přehled příležitostí v praxi

Technické prohloubení

Při plánování řešení problému jsme se nejprve rozhodli dělat aplikace v Javě pomocí frameworku Spring, Nginx balanceru, Postgres databáze a dalších technických i ne tak technických věcí. Protože čas na vývoj technického řešení nám umožnil uvažovat o jiných přístupech k řešení tohoto problému, náš pohled padl na technologii Apache NIFI, která je v určitých kruzích módní. Hned řeknu, že tato technologie nám umožnila všimnout si těchto 3 služeb. Tento článek bude popisovat vývoj služby přenosu souborů a služby přenosu dat spotřebiteli, ale pokud bude článek užitečný, napíšu o službě pro aktualizaci dat v databázi.

Co je to

NIFI je distribuovaná architektura pro rychlé paralelní načítání a zpracování dat, velké množství pluginů pro zdroje a transformace, verzování konfigurací a mnoho dalšího. Příjemným bonusem je, že se velmi snadno používá. Triviální procesy jako getFile, sendHttpRequest a další mohou být reprezentovány jako čtverce. Každý čtverec představuje proces, jehož vzájemné působení je vidět na obrázku níže. Byla napsána podrobnější dokumentace o interakcích s nastavením procesu zde pro ty, kteří mluví rusky - zde. Dokumentace dokonale popisuje, jak rozbalit a spustit NIFI, a také jak vytvořit procesy, známé také jako čtverce
Nápad napsat článek se zrodil po dlouhém hledání a strukturování obdržených informací do něčeho vědomého, stejně jako touha usnadnit život budoucím vývojářům..

příklad

Uvažuje se příklad toho, jak se čtverce vzájemně ovlivňují. Obecné schéma je poměrně jednoduché: Přijmeme požadavek HTTP (Teoreticky se souborem v těle požadavku. Abychom demonstrovali schopnosti NIFI, v tomto příkladu požadavek spustí proces příjmu souboru z místního úložiště souborů ), poté zašleme zpět odpověď, že požadavek byl přijat, souběžně probíhá proces příjmu souboru z FH a následně proces jeho přesunu přes FTP do FH. Stojí za to objasnit, že procesy na sebe vzájemně působí prostřednictvím tzv. flowFile. Toto je základní entita v NIFI, která ukládá atributy a obsah. Obsah jsou data, která jsou reprezentována souborem proudu. Zhruba řečeno, pokud přijmete soubor z jednoho čtverce a přenesete jej do jiného, ​​bude obsahem váš soubor.

Apache NIFI - Stručný přehled příležitostí v praxi

Jak vidíte, tento obrázek ukazuje obecný proces. HandleHttpRequest – přijímá požadavky, ReplaceText – generuje tělo odpovědi, HandleHttpResponse – odesílá odpověď. FetchFile - přijme soubor ze souborového úložiště, přenese jej na čtverec PutSftp - vloží tento soubor na FTP na zadanou adresu. Nyní více o tomto procesu.

V tomto případě je žádost počátkem všeho. Podívejme se na jeho konfigurační parametry.

Apache NIFI - Stručný přehled příležitostí v praxi

Vše je zde celkem triviální s výjimkou StandardHttpContextMap - to je druh služby, která vám umožňuje odesílat a přijímat požadavky. Podrobněji a dokonce i s příklady můžete vidět - zde

Dále se podíváme na konfigurační parametry čtverce ReplaceText. Stojí za to věnovat pozornost ReplacementValue - to je to, co se uživateli vrátí ve formě odpovědi. V nastavení můžete upravit úroveň protokolování, můžete vidět protokoly {kde jste rozbalili nifi}/nifi-1.9.2/logs, jsou zde také parametry selhání/úspěchu - na základě těchto parametrů můžete regulovat proces jako celek . To znamená, že v případě úspěšného zpracování textu bude vyvolán proces odeslání odpovědi uživateli a v jiném případě neúspěšný proces jednoduše zaprotokolujeme.

Apache NIFI - Stručný přehled příležitostí v praxi

Ve vlastnostech HandleHttpResponse není nic zvlášť zajímavého kromě stavu, kdy je odpověď úspěšně vytvořena.

Apache NIFI - Stručný přehled příležitostí v praxi

Vyřešili jsme požadavek a odpověď – pojďme k přijetí souboru a jeho umístění na FTP server. FetchFile – přijme soubor na cestě zadané v nastavení a předá jej dalšímu procesu.

Apache NIFI - Stručný přehled příležitostí v praxi

A pak čtverec PutSftp - umístí soubor do úložiště souborů. Konfigurační parametry můžeme vidět níže.

Apache NIFI - Stručný přehled příležitostí v praxi

Stojí za to věnovat pozornost skutečnosti, že každý čtverec je samostatný proces, který musí být spuštěn. Podívali jsme se na nejjednodušší příklad, který nevyžaduje žádné složité přizpůsobení. Dále se podíváme na proces trochu složitější, kde napíšeme něco o drážkách.

Složitější příklad

Služba přenosu dat spotřebiteli se ukázala být trochu složitější kvůli procesu úpravy zprávy SOAP. Obecný postup je znázorněn na obrázku níže.

Apache NIFI - Stručný přehled příležitostí v praxi

Myšlenka zde také není nijak zvlášť komplikovaná: obdrželi jsme požadavek od spotřebitele, že potřebuje data, odeslali jsme odpověď, že obdržel zprávu, zahájili proces přijímání souboru odpovědí, poté jej upravili s určitou logikou a pak přenesl soubor spotřebiteli ve formě zprávy SOAP na server.

Myslím, že není třeba znovu popisovat ty čtverce, které jsme viděli výše - pojďme rovnou k těm novým. Pokud potřebujete upravit jakýkoli soubor a běžné čtverečky typu ReplaceText nejsou vhodné, budete si muset napsat vlastní skript. To lze provést pomocí čtverce ExecuteGroogyScript. Jeho nastavení je uvedeno níže.

Apache NIFI - Stručný přehled příležitostí v praxi

Existují dvě možnosti pro načtení skriptu do tohoto čtverce. První je stažením souboru se skriptem. Druhým je vložení skriptu do scriptBody. Pokud vím, čtverec executeScript podporuje několik jazyků - jeden z nich je groovy. Zklamu java vývojáře - v takových čtvercích nelze psát skripty v javě. Pro ty, kteří opravdu chtějí, musíte vytvořit svůj vlastní čtverec a přidat jej do systému NIFI. Celou tuto operaci provází docela dlouhý tanec s tamburínou, kterému se v tomto článku nebudeme věnovat. Vybral jsem si groovy jazyk. Níže je testovací skript, který jednoduše postupně aktualizuje id ve zprávě SOAP. Je důležité poznamenat. Vezmete soubor z flowFile a aktualizujete jej, nezapomeňte, že jej tam musíte vrátit, aktualizovaný. Za zmínku také stojí, že ne všechny knihovny jsou zahrnuty. Může se stát, že ještě budete muset importovat jednu z knihoven. Další nevýhodou je, že skript v tomto čtverci se dost obtížně ladí. Existuje způsob, jak se připojit k NIFI JVM a zahájit proces ladění. Osobně jsem spustil lokální aplikaci a simuloval příjem souboru z relace. Lokálně jsem také ladil. Chyby, které se objeví při načítání skriptu, Google celkem snadno zapisuje a do protokolu je zapisuje samo NIFI.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Vlastně zde přizpůsobení náměstí končí. Dále je aktualizovaný soubor přenesen do čtverce, který je zodpovědný za odeslání souboru na server. Níže jsou uvedena nastavení pro tento čtverec.

Apache NIFI - Stručný přehled příležitostí v praxi

Popisujeme způsob, kterým bude SOAP zpráva přenášena. Píšeme kde. Dále musíte uvést, že se jedná o SOAP.

Apache NIFI - Stručný přehled příležitostí v praxi

Přidejte několik vlastností, jako je hostitel a akce (soapAction). Ukládáme a kontrolujeme. Můžete vidět další podrobnosti o tom, jak odesílat požadavky SOAP zde

Podívali jsme se na několik možností použití procesů NIFI. Jak se ovlivňují a jaký je jejich skutečný přínos? Uvažované příklady jsou testovací a mírně se liší od toho, co se skutečně děje v boji. Doufám, že tento článek bude pro vývojáře trochu užitečný. Děkuji za pozornost. Pokud máte nějaké dotazy, napište. Pokusím se odpovědět.

Zdroj: www.habr.com

Přidat komentář