Automação de entrega de fluxo no Apache NiFi

Olá a todos!

Automação de entrega de fluxo no Apache NiFi

A tarefa é a seguinte - há um fluxo mostrado na imagem acima, que precisa ser implementado em N servidores com Apache NiFiName. Teste de fluxo - um arquivo está sendo gerado e enviado para outra instância NiFi. A transferência de dados ocorre usando o protocolo NiFi Site to Site.

NiFi Site to Site (S2S) é uma forma segura e altamente personalizável de transferir dados entre instâncias NiFi. Veja como funciona o S2S documentação e é importante lembrar de configurar sua instância NiFi para permitir que S2S veja aqui.

Quando se trata de transferência de dados usando S2S, uma instância é chamada de cliente, a segunda é um servidor. O cliente envia dados, o servidor os recebe. Duas maneiras de configurar a transferência de dados entre eles:

  1. Empurrar. Os dados são enviados da instância do cliente usando um Remote Process Group (RPG). Na instância do servidor, os dados são recebidos usando a porta de entrada
  2. Puxe. O servidor recebe dados através do RPG, o cliente envia através da porta de saída.


O fluxo para rolamento é armazenado no Registro Apache.

Apache NiFi Registry é um subprojeto do Apache NiFi que fornece uma ferramenta de armazenamento e controle de versão de fluxo. Uma espécie de GIT. Informações sobre instalação, configuração e trabalho com o registro podem ser encontradas em documentação oficial. O fluxo para armazenamento é combinado em um grupo de processos e armazenado no registro neste formulário. Voltaremos a isso mais adiante neste artigo.

No início, quando N é um número pequeno, o fluxo é entregue e atualizado manualmente em um tempo razoável.

Mas à medida que N cresce, há mais problemas:

  1. leva mais tempo para atualizar o fluxo. Você precisa fazer login em todos os servidores
  2. há erros ao atualizar modelos. Aqui eles atualizaram, mas aqui eles esqueceram
  3. erro humano ao realizar um grande número de operações semelhantes

Tudo isso nos leva ao fato de que precisamos automatizar o processo. Tentei as seguintes maneiras de resolver esse problema:

  1. Use MiNiFi em vez de NiFi
  2. CLI NiFi
  3. NiPyAPI

Usando MiNiFi

ApacheMiNify é um subprojeto do Apache NiFi. MiNiFy é um agente compacto que usa os mesmos processadores do NiFi, permitindo criar o mesmo fluxo do NiFi. A leveza do agente é alcançada, entre outras coisas, pelo fato do MiNiFy não possuir interface gráfica para configuração do fluxo. A falta de interface gráfica do MiNiFy significa que é necessário resolver o problema de entrega de fluxo no minifi. Como o MiNiFy é usado ativamente na IOT, há muitos componentes e o processo de entrega do fluxo às instâncias finais do minifi deve ser automatizado. Uma tarefa familiar, certo?

Outro subprojeto, MiNiFi C2 Server, ajudará a resolver este problema. Este produto pretende ser o ponto central na arquitetura de implantação. Como configurar o ambiente - descrito em Este artigo em Habré e a informação é suficiente para resolver o problema. MiNiFi em conjunto com o servidor C2 atualiza automaticamente sua configuração. A única desvantagem dessa abordagem é que você precisa criar modelos no servidor C2; um simples commit no registro não é suficiente.

A opção descrita no artigo acima funciona e não é difícil de implementar, mas não devemos esquecer o seguinte:

  1. minifi não possui todos os processadores da nifi
  2. As versões de CPU no Minifi ficam atrás das versões de CPU no NiFi.

No momento em que este artigo foi escrito, a versão mais recente do NiFi era 1.9.2. A versão do processador da versão mais recente do MiNiFi é 1.7.0. Processadores podem ser adicionados ao MiNiFi, mas devido a discrepâncias de versão entre os processadores NiFi e MiNiFi, isso pode não funcionar.

CLI NiFi

A julgar pelo Descrição ferramenta no site oficial, é uma ferramenta para automatizar a interação entre NiFI e NiFi Registry na área de entrega de fluxo ou gestão de processos. Baixe esta ferramenta para começar. por isso.

Inicie o utilitário

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Para que possamos carregar o fluxo necessário do registro, precisamos conhecer os identificadores da cesta (identificador do bucket) e do próprio fluxo (identificador do fluxo). Esses dados podem ser obtidos através do cli ou na interface web do registro NiFi. A interface da web é semelhante a esta:

Automação de entrega de fluxo no Apache NiFi

Usando a CLI, você faz isso:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Execute o grupo de processos de importação do registro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Um ponto importante é que qualquer instância nifi pode ser especificada como o host no qual implantamos o grupo de processos.

Grupo de processos adicionado com processadores parados, eles precisam ser iniciados

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Ótimo, os processadores foram iniciados. Porém, de acordo com os termos da tarefa, precisamos de instâncias NiFi para enviar dados para outras instâncias. Vamos supor que você escolheu o método Push para transferir dados para o servidor. Para organizar a transferência de dados, você precisa habilitar a transferência de dados no Remote Process Group (RPG) adicionado, que já está incluído em nosso fluxo.

Automação de entrega de fluxo no Apache NiFi

Na documentação da CLI e de outras fontes, não encontrei uma maneira de habilitar a transferência de dados. Se você sabe como fazer isso, escreva nos comentários.

Já que temos o bash e estamos prontos para ir até o fim, vamos dar um jeito! Você pode usar a API NiFi para resolver este problema. Vamos usar o seguinte método, pegamos o ID dos exemplos acima (no nosso caso é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrição dos métodos da API NiFi aqui.

Automação de entrega de fluxo no Apache NiFi
No corpo, você precisa passar JSON, no seguinte formato:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parâmetros que devem ser preenchidos para “funcionar”:
estado — status da transferência de dados. Disponível TRANSMITTING para ativar a transferência de dados, STOPPED para desativar
versão - versão do processador

a versão terá como padrão 0 quando criada, mas esses parâmetros podem ser obtidos usando o método

Automação de entrega de fluxo no Apache NiFi

Para os amantes de scripts bash, esse método pode parecer adequado, mas é difícil para mim - os scripts bash não são meus favoritos. A próxima forma é mais interessante e conveniente na minha opinião.

NiPyAPI

NiPyAPI é uma biblioteca Python para interagir com instâncias NiFi. Página de documentação contém as informações necessárias para trabalhar com a biblioteca. O início rápido é descrito em projeto no github.

Nosso script para implementar a configuração é um programa em Python. Vamos passar para a codificação.
Configure configurações para trabalhos futuros. Precisaremos dos seguintes parâmetros:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

A seguir irei inserir os nomes dos métodos desta biblioteca, que são descritos aqui.

Conectamos o registro à instância nifi usando

nipyapi.versioning.create_registry_client

Nesta etapa você também pode adicionar uma verificação de que o registro já foi adicionado à instância, para isso você pode usar o método

nipyapi.versioning.list_registry_clients

Encontramos o balde para pesquisa adicional de fluxo na cesta

nipyapi.versioning.get_registry_bucket

De acordo com o balde encontrado, estamos procurando fluxo

nipyapi.versioning.get_flow_in_bucket

A seguir, é importante entender se este grupo de processos já foi adicionado. O grupo de processos é colocado por coordenadas e pode surgir uma situação quando um segundo é sobreposto a um. Eu verifiquei, pode ser 🙂 Para obter todo o grupo de processos adicionado, use o método

nipyapi.canvas.list_all_process_groups

e então podemos pesquisar, por exemplo, por nome.

Não vou descrever o processo de atualização do template, apenas direi que se forem adicionados processadores na nova versão do template, não haverá problemas com a presença de mensagens nas filas. Mas se os processadores forem removidos, podem surgir problemas (o nifi não permite a remoção do processador se uma fila de mensagens se acumular na frente dele). Se você estiver interessado em saber como resolvi esse problema - escreva para mim, por favor, discutiremos esse ponto. Contatos no final do artigo. Vamos passar para a etapa de adição de um grupo de processos.

Ao depurar o script, me deparei com um recurso de que a versão mais recente do fluxo nem sempre é acessada, então recomendo que você primeiro esclareça esta versão:

nipyapi.versioning.get_latest_flow_ver

Implantar grupo de processos:

nipyapi.versioning.deploy_flow_version

Iniciamos os processadores:

nipyapi.canvas.schedule_process_group

No bloco sobre CLI, foi escrito que a transferência de dados não é habilitada automaticamente no grupo de processos remotos? Ao implementar o script, também encontrei esse problema. Naquela época, não consegui iniciar a transferência de dados usando a API e decidi escrever para o desenvolvedor da biblioteca NiPyAPI e pedir conselhos/ajuda. O desenvolvedor me respondeu, discutimos o problema e ele escreveu que precisava de tempo para “verificar alguma coisa”. E agora, alguns dias depois, chega um e-mail no qual está escrita uma função Python que resolve meu problema de inicialização !!! Naquela época, a versão do NiPyAPI era 0.13.3 e, claro, não havia nada parecido nela. Mas na versão 0.14.0, lançada recentemente, esta função já foi incluída na biblioteca. Encontrar

nipyapi.canvas.set_remote_process_group_transmission

Assim, com a ajuda da biblioteca NiPyAPI, conectamos o registro, aumentamos o fluxo e até iniciamos os processadores e a transferência de dados. Então você pode pentear o código, adicionar todos os tipos de verificações, registros e pronto. Mas essa é uma história completamente diferente.

Das opções de automação que considerei, esta me pareceu a mais eficiente. Em primeiro lugar, este ainda é um código python, no qual você pode incorporar código de programa auxiliar e aproveitar todos os benefícios de uma linguagem de programação. Em segundo lugar, o projeto NiPyAPI está em desenvolvimento ativo e em caso de problemas você pode escrever para o desenvolvedor. Em terceiro lugar, o NiPyAPI ainda é uma ferramenta mais flexível para interagir com o NiFi na resolução de problemas complexos. Por exemplo, para determinar se as filas de mensagens estão atualmente vazias no fluxo e se é possível atualizar o grupo de processos.

Isso é tudo. Descrevi três abordagens para automatizar a entrega de fluxo em NiFi, as armadilhas que um desenvolvedor pode encontrar e forneci um código funcional para automatizar a entrega. Se você está tão interessado neste tópico quanto eu - escrever!

Fonte: habr.com

Adicionar um comentário