Apache NIFI - Uma breve visão geral das oportunidades na prática

Introdução

Acontece que no meu atual local de trabalho tive que me familiarizar com essa tecnologia. Vou começar com um pequeno histórico. Na reunião seguinte, nossa equipe foi informada que precisávamos criar integração com sistema conhecido. Por integração, significava que esse sistema bem conhecido nos enviaria solicitações via HTTP para um endpoint específico e nós, curiosamente, enviaríamos respostas na forma de uma mensagem SOAP. Tudo parece simples e trivial. Daí resulta que você precisa...

Tarefa

Crie 3 serviços. O primeiro deles é o Serviço de Atualização de Banco de Dados. Este serviço, quando novos dados chegam de um sistema de terceiros, atualiza os dados do banco de dados e gera um arquivo em formato CSV para transferi-lo para o próximo sistema. O ponto final do segundo serviço é chamado - FTP Transport Service, que recebe o arquivo transferido, valida-o e o coloca no armazenamento de arquivos via FTP. O terceiro serviço, o serviço de transferência de dados do consumidor, funciona de forma assíncrona com os dois primeiros. Ele recebe uma solicitação de um sistema externo de terceiros para receber o arquivo discutido acima, pega o arquivo de resposta pronto, modifica-o (atualiza os campos id, descrição, linkToFile) e envia a resposta na forma de uma mensagem SOAP. Ou seja, o quadro geral é o seguinte: os dois primeiros serviços só iniciam seu trabalho quando chegam os dados para atualização. O terceiro serviço funciona constantemente porque são muitos consumidores de informação, cerca de 1000 pedidos de dados por minuto. Os serviços estão disponíveis constantemente e suas instâncias estão localizadas em diferentes ambientes, como teste, demonstração, pré-produção e produção. Abaixo está um diagrama de como esses serviços funcionam. Deixe-me esclarecer desde já que alguns detalhes foram simplificados para evitar complexidade desnecessária.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Aprofundamento Técnico

Ao planejar uma solução para o problema, decidimos primeiro fazer aplicações em Java utilizando o framework Spring, balanceador Nginx, banco de dados Postgres e outras coisas técnicas e não tão técnicas. Como o tempo de desenvolvimento de uma solução técnica nos permitiu considerar outras abordagens para resolver este problema, nosso olhar recaiu sobre a tecnologia Apache NIFI, que está na moda em certos círculos. Direi desde já que esta tecnologia nos permitiu perceber estes 3 serviços. Este artigo descreverá o desenvolvimento de um serviço de transporte de arquivos e de um serviço de transferência de dados para o consumidor, mas se o artigo for útil, escreverei sobre o serviço de atualização de dados no banco de dados.

O que é isto

NIFI é uma arquitetura distribuída para rápido carregamento e processamento paralelo de dados, um grande número de plugins para fontes e transformações, versionamento de configurações e muito mais. Um bom bônus é que é muito fácil de usar. Processos triviais como getFile, sendHttpRequest e outros podem ser representados como quadrados. Cada quadrado representa um processo, cuja interação pode ser vista na figura abaixo. Documentação mais detalhada sobre interações de configuração de processos foi escrita aqui , para quem fala russo - aqui. A documentação descreve perfeitamente como descompactar e executar o NIFI, bem como criar processos, também conhecidos como quadrados
A ideia de escrever um artigo nasceu após uma longa busca e estruturação das informações recebidas em algo consciente, bem como da vontade de facilitar um pouco a vida dos futuros desenvolvedores.

Exemplo

Um exemplo de como os quadrados interagem entre si é considerado. O esquema geral é bastante simples: Recebemos uma solicitação HTTP (em teoria, com um arquivo no corpo da solicitação. Para demonstrar as capacidades do NIFI, neste exemplo a solicitação inicia o processo de recebimento de um arquivo do armazenamento local de arquivos ), em seguida, enviamos de volta uma resposta informando que a solicitação foi recebida, paralelamente ao processo de recebimento de um arquivo de FH e depois ao processo de transferência via FTP para FH. Vale esclarecer que os processos interagem entre si por meio do chamado flowFile. Esta é a entidade base no NIFI que armazena atributos e conteúdo. Conteúdo são os dados representados pelo arquivo de fluxo. Ou seja, grosso modo, se você receber um arquivo de um quadrado e transferi-lo para outro, o conteúdo será o seu arquivo.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Como você pode ver, esta imagem mostra o processo geral. HandleHttpRequest - aceita solicitações, ReplaceText - gera um corpo de resposta, HandleHttpResponse - envia uma resposta. FetchFile - recebe um arquivo de um armazenamento de arquivos, transfere-o para o quadrado PutSftp - coloca esse arquivo no FTP, no endereço especificado. Agora mais sobre esse processo.

Neste caso, request é o começo de tudo. Vejamos seus parâmetros de configuração.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Tudo aqui é bastante trivial, com exceção do StandardHttpContextMap - este é um tipo de serviço que permite enviar e receber solicitações. Mais detalhadamente e até com exemplos, você pode ver - aqui

A seguir, vamos dar uma olhada nos parâmetros de configuração do PlaceText do quadrado. Vale a pena prestar atenção em ReplacementValue - é o que será retornado ao usuário em forma de resposta. Nas configurações você pode ajustar o nível de registro, você pode ver os logs {onde você descompactou o nifi}/nifi-1.9.2/logs, também existem parâmetros de falha/sucesso - com base nesses parâmetros você pode regular o processo como um todo . Ou seja, no caso de processamento de texto bem-sucedido, será chamado o processo de envio de resposta ao usuário e, em outro caso, simplesmente registraremos o processo malsucedido.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Não há nada particularmente interessante nas propriedades HandleHttpResponse, exceto o status quando uma resposta é criada com sucesso.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Resolvemos a solicitação e a resposta - vamos receber o arquivo e colocá-lo no servidor FTP. FetchFile - recebe um arquivo no caminho especificado nas configurações e passa para o próximo processo.

Apache NIFI - Uma breve visão geral das oportunidades na prática

E então o quadrado PutSftp - coloca o arquivo no armazenamento de arquivos. Podemos ver os parâmetros de configuração abaixo.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Vale atentar para o fato de que cada quadrado é um processo distinto que deve ser lançado. Vimos o exemplo mais simples que não requer nenhuma personalização complexa. A seguir veremos o processo um pouco mais complicado, onde escreveremos um pouco nas ranhuras.

Exemplo mais complexo

O serviço de transferência de dados ao consumidor acabou sendo um pouco mais complicado devido ao processo de modificação da mensagem SOAP. O processo geral é mostrado na figura abaixo.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Aqui a ideia também não é particularmente complicada: recebemos uma solicitação do consumidor de que ele precisava de dados, enviamos uma resposta informando que havia recebido uma mensagem, iniciamos o processo de recebimento do arquivo de resposta, depois editamos com uma certa lógica, e então transferiu o arquivo para o consumidor na forma de uma mensagem SOAP para o servidor.

Acho que não há necessidade de descrever novamente os quadrados que vimos acima - vamos direto aos novos. Se você precisar editar algum arquivo e os quadrados comuns do tipo ReplaceText não forem adequados, você terá que escrever seu próprio script. Isso pode ser feito usando o quadrado ExecuteGroogyScript. Suas configurações são apresentadas a seguir.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Existem duas opções para carregar o script neste quadrado. A primeira é baixando um arquivo com um script. A segunda é inserindo um script em scriptBody. Pelo que eu sei, o quadrado executeScript suporta várias linguagens - uma delas é bacana. Vou decepcionar os desenvolvedores de Java - você não pode escrever scripts em Java nesses quadrados. Para quem realmente deseja, é necessário criar seu próprio quadrado personalizado e adicioná-lo ao sistema NIFI. Toda esta operação é acompanhada por uma longa dança com pandeiro, da qual não trataremos neste artigo. Eu escolhi a linguagem bacana. Abaixo está um script de teste que simplesmente atualiza incrementalmente o ID em uma mensagem SOAP. É importante notar. Você pega o arquivo do flowFile e atualiza ele, não esquece que precisa colocar ele de volta lá, atualizado. Também é importante notar que nem todas as bibliotecas estão incluídas. Pode acontecer que você ainda precise importar uma das bibliotecas. Outra desvantagem é que o script neste quadrado é bastante difícil de depurar. Existe uma maneira de conectar-se à JVM NIFI e iniciar o processo de depuração. Pessoalmente, lancei um aplicativo local e simulei o recebimento de um arquivo da sessão. Eu também fiz depuração localmente. Erros que aparecem ao carregar um script são bastante fáceis para o Google e são gravados pelo próprio NIFI no log.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Na verdade, é aqui que termina a customização do quadrado. Em seguida, o arquivo atualizado é transferido para a praça, que é responsável por enviar o arquivo ao servidor. Abaixo estão as configurações para este quadrado.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Descrevemos o método pelo qual uma mensagem SOAP será transmitida. Nós escrevemos onde. Em seguida, você precisa indicar que se trata de SOAP.

Apache NIFI - Uma breve visão geral das oportunidades na prática

Adicione diversas propriedades como host e action (soapAction). Nós salvamos e verificamos. Você pode ver mais detalhes sobre como enviar solicitações SOAP aqui

Examinamos várias opções para usar processos NIFI. Como eles interagem e qual o seu real benefício? Os exemplos considerados são de teste e são ligeiramente diferentes do que realmente acontece em combate. Espero que este artigo seja um pouco útil para desenvolvedores. Obrigado pela sua atenção. Se você tiver alguma dúvida, escreva. Vou tentar responder.

Fonte: habr.com

Adicionar um comentário