Uvod
Tako se dogodilo da sam se na trenutnom radnom mjestu morao upoznati s ovom tehnologijom. Počet ću s malo pozadine. Na sljedećem sastanku našem je timu rečeno da moramo stvoriti integraciju s poznati sustav. Pod integracijom se mislilo na to da će nam ovaj dobro poznati sustav slati zahtjeve putem HTTP-a do određene krajnje točke, a mi ćemo, što je čudno, poslati odgovore u obliku SOAP poruke. Sve izgleda jednostavno i trivijalno. Iz ovoga proizilazi da trebate...
Zadatak
Napravite 3 usluge. Prva od njih je usluga ažuriranja baze podataka. Ova usluga, kada novi podaci stignu iz sustava treće strane, ažurira podatke u bazi podataka i generira datoteku u CSV formatu za prijenos na sljedeći sustav. Krajnja točka druge usluge se zove - FTP Transport Service, koja prima prenesenu datoteku, potvrđuje je i stavlja u pohranu datoteka putem FTP-a. Treća usluga, usluga prijenosa podataka potrošača, radi asinkrono s prve dvije. Prima zahtjev od vanjskog sustava treće strane za primanje gore navedene datoteke, uzima spremnu datoteku odgovora, modificira je (ažurira ID, opis, polja linkToFile) i šalje odgovor u obliku SOAP poruke. Odnosno, ukupna slika je sljedeća: prve dvije usluge počinju s radom tek kada stignu podaci za ažuriranje. Treći servis radi konstantno jer ima mnogo potrošača informacija, oko 1000 zahtjeva za podacima u minuti. Usluge su dostupne stalno, a njihove instance nalaze se u različitim okruženjima, kao što su test, demo, pretprodukcija i proizvodnja. Ispod je dijagram koji prikazuje kako te usluge rade. Dopustite mi da odmah pojasnim da su neki detalji pojednostavljeni kako bi se izbjegla nepotrebna složenost.
Tehničko produbljivanje
Prilikom planiranja rješenja problema prvo smo odlučili raditi aplikacije u Javi koristeći Spring framework, Nginx balancer, Postgres bazu i ostale tehničke i manje tehničke stvari. Budući da nam je vrijeme za razvoj tehničkog rješenja omogućilo da razmotrimo i druge pristupe rješavanju ovog problema, pogled nam je pao na Apache NIFI tehnologiju, koja je u nekim krugovima moderna. Odmah ću reći da nam je ova tehnologija omogućila da primijetimo ove 3 usluge. Ovaj članak će opisati razvoj usluge prijenosa datoteka i usluge prijenosa podataka do potrošača, ali ako je članak koristan, pisati ću o usluzi ažuriranja podataka u bazi podataka.
Što je ovo
NIFI je distribuirana arhitektura za brzo paralelno učitavanje i obradu podataka, velik broj dodataka za izvore i transformacije, verzioniranje konfiguracija i još mnogo toga. Dobar bonus je to što je vrlo jednostavan za korištenje. Trivijalni procesi kao što su getFile, sendHttpRequest i drugi mogu se prikazati kao kvadrati. Svaki kvadrat predstavlja proces čija se interakcija može vidjeti na donjoj slici. Napisana je detaljnija dokumentacija o interakcijama postavljanja procesa
Ideja za pisanjem članka rodila se nakon dugog traženja i strukturiranja dobivenih informacija u nešto svjesno, kao i želje da se budućim programerima barem malo olakša život.
Primjer
Razmatran je primjer međusobnog djelovanja kvadrata. Opća shema je prilično jednostavna: primamo HTTP zahtjev (u teoriji, s datotekom u tijelu zahtjeva. Kako bismo demonstrirali mogućnosti NIFI-ja, u ovom primjeru zahtjev pokreće proces primanja datoteke iz lokalne pohrane datoteka ), zatim šaljemo natrag odgovor da je zahtjev zaprimljen, paralelno ide proces zaprimanja datoteke od FH i zatim proces premještanja FTP-om na FH. Vrijedno je pojasniti da procesi međusobno djeluju putem takozvane flowFile. Ovo je osnovni entitet u NIFI-ju koji pohranjuje atribute i sadržaj. Sadržaj su podaci koje predstavlja datoteka toka. Odnosno, grubo rečeno, ako primite datoteku s jednog kvadrata i prenesete je na drugi, sadržaj će biti vaša datoteka.
Kao što vidite, ova slika prikazuje opći proces. HandleHttpRequest - prihvaća zahtjeve, ReplaceText - generira tijelo odgovora, HandleHttpResponse - šalje odgovor. FetchFile - prima datoteku iz skladišta datoteka, prenosi je na trg PutSftp - stavlja ovu datoteku na FTP, na navedenu adresu. Sada više o ovom procesu.
U ovom slučaju zahtjev je početak svega. Pogledajmo njegove konfiguracijske parametre.
Ovdje je sve prilično trivijalno s izuzetkom StandardHttpContextMap - ovo je vrsta usluge koja vam omogućuje slanje i primanje zahtjeva. Detaljnije, pa čak i s primjerima, možete vidjeti -
Zatim, pogledajmo konfiguracijske parametre kvadrata ReplaceText. Vrijedno je obratiti pozornost na ReplacementValue - to je ono što će biti vraćeno korisniku u obliku odgovora. U postavkama možete prilagoditi razinu zapisivanja, možete vidjeti zapisnike {gdje ste raspakirali nifi}/nifi-1.9.2/logove, tu su i parametri neuspjeha/uspjeha - na temelju tih parametara možete regulirati proces u cjelini . Odnosno, u slučaju uspješne obrade teksta, bit će pozvan proces slanja odgovora korisniku, au drugom slučaju jednostavno ćemo zabilježiti neuspješan proces.
U svojstvima HandleHttpResponse nema ništa posebno zanimljivo osim statusa kada je odgovor uspješno kreiran.
Razvrstali smo zahtjev i odgovor - idemo dalje na primanje datoteke i njeno postavljanje na FTP poslužitelj. FetchFile - prima datoteku na stazi navedenoj u postavkama i prosljeđuje je sljedećem procesu.
A zatim PutSftp kvadrat - smješta datoteku u pohranu datoteka. Dolje možemo vidjeti konfiguracijske parametre.
Vrijedno je obratiti pozornost na činjenicu da je svaki kvadrat zaseban proces koji se mora pokrenuti. Pogledali smo najjednostavniji primjer koji ne zahtijeva nikakvu složenu prilagodbu. Zatim ćemo pogledati proces malo kompliciraniji, gdje ćemo malo pisati o utorima.
Složeniji primjer
Usluga prijenosa podataka potrošaču pokazala se malo kompliciranijom zbog procesa modificiranja SOAP poruke. Opći postupak prikazan je na donjoj slici.
Ovdje ideja također nije posebno komplicirana: primili smo zahtjev od potrošača da mu trebaju podaci, poslali odgovor da je primio poruku, pokrenuli proces primanja datoteke odgovora, zatim je uredili određenom logikom, a zatim prenio datoteku potrošaču u obliku SOAP poruke na poslužitelj.
Mislim da nema potrebe ponovno opisivati kvadrate koje smo vidjeli gore - prijeđimo odmah na nove. Ako trebate urediti bilo koju datoteku, a obični kvadrati tipa ReplaceText nisu prikladni, morat ćete napisati vlastitu skriptu. To se može učiniti pomoću kvadrata ExecuteGroogyScript. Njegove postavke prikazane su u nastavku.
Postoje dvije opcije za učitavanje skripte u ovaj kvadrat. Prvi je preuzimanjem datoteke sa skriptom. Drugi je umetanjem skripte u scriptBody. Koliko ja znam, executeScript square podržava nekoliko jezika - jedan od njih je groovy. Razočarat ću Java programere - ne možete pisati skripte u Javi u takvim kvadratima. Za one koji to stvarno žele, morate kreirati vlastiti kvadrat i dodati ga u NIFI sustav. Cijelu ovu operaciju prati dosta dug ples uz tamburicu, kojim se u ovom članku nećemo baviti. Izabrao sam groovy jezik. Ispod je testna skripta koja jednostavno postupno ažurira ID u SOAP poruci. Važno je napomenuti. Uzimate datoteku iz flowFile-a i ažurirate je, ne zaboravite da je trebate vratiti tamo, ažuriranu. Također je vrijedno napomenuti da nisu uključene sve knjižnice. Može se dogoditi da ipak morate uvesti neku od libova. Još jedna mana je da je skriptu u ovom kvadratu prilično teško ispraviti. Postoji način povezivanja na NIFI JVM i pokretanje procesa otklanjanja pogrešaka. Osobno sam pokrenuo lokalnu aplikaciju i simulirao primanje datoteke sa sesije. Također sam radio debugging lokalno. Pogreške koje se pojavljuju prilikom učitavanja skripte prilično su jednostavne za Google, a sam NIFI ih upisuje u dnevnik.
import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder
def flowFile = session.get()
if (!flowFile) return
try {
flowFile = session.write(flowFile, { inputStream, outputStream ->
String result = IOUtils.toString(inputStream, "UTF-8");
def recordIn = new XmlSlurper().parseText(result)
def element = recordIn.depthFirst().find {
it.name() == 'id'
}
def newId = Integer.parseInt(element.toString()) + 1
def recordOut = new XmlSlurper().parseText(result)
recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId
def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.error("Error during processing of validate.groovy", e)
session.transfer(flowFile, REL_FAILURE)
}
Zapravo, ovdje završava prilagodba kvadrata. Zatim se ažurirana datoteka prenosi na trg koji je odgovoran za slanje datoteke na poslužitelj. Ispod su postavke za ovaj kvadrat.
Opisujemo metodu kojom će se SOAP poruka prenositi. Pišemo gdje. Zatim morate naznačiti da je ovo SOAP.
Dodajte nekoliko svojstava kao što su host i akcija (soapAction). Štedimo i provjeravamo. Možete vidjeti više detalja o tome kako poslati SOAP zahtjeve
Pregledali smo nekoliko opcija za korištenje NIFI procesa. Kako oni međusobno djeluju i koja je njihova stvarna korist? Razmotreni primjeri su testni i malo se razlikuju od onoga što se stvarno događa u borbi. Nadam se da će ovaj članak biti malo koristan za programere. Hvala vam na pažnji. Ako imate pitanja, pišite. Pokušat ću odgovoriti.
Izvor: www.habr.com