Olá a todos!
A tarefa é a seguinte - há um fluxo mostrado na imagem acima, que precisa ser implementado em N servidores com
NiFi Site to Site (S2S) é uma forma segura e altamente personalizável de transferir dados entre instâncias NiFi. Veja como funciona o S2S
Quando se trata de transferência de dados usando S2S, uma instância é chamada de cliente, a segunda é um servidor. O cliente envia dados, o servidor os recebe. Duas maneiras de configurar a transferência de dados entre eles:
- Empurrar. Os dados são enviados da instância do cliente usando um Remote Process Group (RPG). Na instância do servidor, os dados são recebidos usando a porta de entrada
- Puxe. O servidor recebe dados através do RPG, o cliente envia através da porta de saída.
O fluxo para rolamento é armazenado no Registro Apache.
Apache NiFi Registry é um subprojeto do Apache NiFi que fornece uma ferramenta de armazenamento e controle de versão de fluxo. Uma espécie de GIT. Informações sobre instalação, configuração e trabalho com o registro podem ser encontradas em
No início, quando N é um número pequeno, o fluxo é entregue e atualizado manualmente em um tempo razoável.
Mas à medida que N cresce, há mais problemas:
- leva mais tempo para atualizar o fluxo. Você precisa fazer login em todos os servidores
- há erros ao atualizar modelos. Aqui eles atualizaram, mas aqui eles esqueceram
- erro humano ao realizar um grande número de operações semelhantes
Tudo isso nos leva ao fato de que precisamos automatizar o processo. Tentei as seguintes maneiras de resolver esse problema:
- Use MiNiFi em vez de NiFi
- CLI NiFi
- NiPyAPI
Usando MiNiFi
Outro subprojeto, MiNiFi C2 Server, ajudará a resolver este problema. Este produto pretende ser o ponto central na arquitetura de implantação. Como configurar o ambiente - descrito em
A opção descrita no artigo acima funciona e não é difícil de implementar, mas não devemos esquecer o seguinte:
- minifi não possui todos os processadores da nifi
- As versões de CPU no Minifi ficam atrás das versões de CPU no NiFi.
No momento em que este artigo foi escrito, a versão mais recente do NiFi era 1.9.2. A versão do processador da versão mais recente do MiNiFi é 1.7.0. Processadores podem ser adicionados ao MiNiFi, mas devido a discrepâncias de versão entre os processadores NiFi e MiNiFi, isso pode não funcionar.
CLI NiFi
A julgar pelo
Inicie o utilitário
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Para que possamos carregar o fluxo necessário do registro, precisamos conhecer os identificadores da cesta (identificador do bucket) e do próprio fluxo (identificador do fluxo). Esses dados podem ser obtidos através do cli ou na interface web do registro NiFi. A interface da web é semelhante a esta:
Usando a CLI, você faz isso:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Execute o grupo de processos de importação do registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Um ponto importante é que qualquer instância nifi pode ser especificada como o host no qual implantamos o grupo de processos.
Grupo de processos adicionado com processadores parados, eles precisam ser iniciados
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Ótimo, os processadores foram iniciados. Porém, de acordo com os termos da tarefa, precisamos de instâncias NiFi para enviar dados para outras instâncias. Vamos supor que você escolheu o método Push para transferir dados para o servidor. Para organizar a transferência de dados, você precisa habilitar a transferência de dados no Remote Process Group (RPG) adicionado, que já está incluído em nosso fluxo.
Na documentação da CLI e de outras fontes, não encontrei uma maneira de habilitar a transferência de dados. Se você sabe como fazer isso, escreva nos comentários.
Já que temos o bash e estamos prontos para ir até o fim, vamos dar um jeito! Você pode usar a API NiFi para resolver este problema. Vamos usar o seguinte método, pegamos o ID dos exemplos acima (no nosso caso é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrição dos métodos da API NiFi
No corpo, você precisa passar JSON, no seguinte formato:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parâmetros que devem ser preenchidos para “funcionar”:
estado — status da transferência de dados. Disponível TRANSMITTING para ativar a transferência de dados, STOPPED para desativar
versão - versão do processador
a versão terá como padrão 0 quando criada, mas esses parâmetros podem ser obtidos usando o método
Para os amantes de scripts bash, esse método pode parecer adequado, mas é difícil para mim - os scripts bash não são meus favoritos. A próxima forma é mais interessante e conveniente na minha opinião.
NiPyAPI
NiPyAPI é uma biblioteca Python para interagir com instâncias NiFi.
Nosso script para implementar a configuração é um programa em Python. Vamos passar para a codificação.
Configure configurações para trabalhos futuros. Precisaremos dos seguintes parâmetros:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
A seguir irei inserir os nomes dos métodos desta biblioteca, que são descritos
Conectamos o registro à instância nifi usando
nipyapi.versioning.create_registry_client
Nesta etapa você também pode adicionar uma verificação de que o registro já foi adicionado à instância, para isso você pode usar o método
nipyapi.versioning.list_registry_clients
Encontramos o balde para pesquisa adicional de fluxo na cesta
nipyapi.versioning.get_registry_bucket
De acordo com o balde encontrado, estamos procurando fluxo
nipyapi.versioning.get_flow_in_bucket
A seguir, é importante entender se este grupo de processos já foi adicionado. O grupo de processos é colocado por coordenadas e pode surgir uma situação quando um segundo é sobreposto a um. Eu verifiquei, pode ser 🙂 Para obter todo o grupo de processos adicionado, use o método
nipyapi.canvas.list_all_process_groups
e então podemos pesquisar, por exemplo, por nome.
Não vou descrever o processo de atualização do template, apenas direi que se forem adicionados processadores na nova versão do template, não haverá problemas com a presença de mensagens nas filas. Mas se os processadores forem removidos, podem surgir problemas (o nifi não permite a remoção do processador se uma fila de mensagens se acumular na frente dele). Se você estiver interessado em saber como resolvi esse problema - escreva para mim, por favor, discutiremos esse ponto. Contatos no final do artigo. Vamos passar para a etapa de adição de um grupo de processos.
Ao depurar o script, me deparei com um recurso de que a versão mais recente do fluxo nem sempre é acessada, então recomendo que você primeiro esclareça esta versão:
nipyapi.versioning.get_latest_flow_ver
Implantar grupo de processos:
nipyapi.versioning.deploy_flow_version
Iniciamos os processadores:
nipyapi.canvas.schedule_process_group
No bloco sobre CLI, foi escrito que a transferência de dados não é habilitada automaticamente no grupo de processos remotos? Ao implementar o script, também encontrei esse problema. Naquela época, não consegui iniciar a transferência de dados usando a API e decidi escrever para o desenvolvedor da biblioteca NiPyAPI e pedir conselhos/ajuda. O desenvolvedor me respondeu, discutimos o problema e ele escreveu que precisava de tempo para “verificar alguma coisa”. E agora, alguns dias depois, chega um e-mail no qual está escrita uma função Python que resolve meu problema de inicialização !!! Naquela época, a versão do NiPyAPI era 0.13.3 e, claro, não havia nada parecido nela. Mas na versão 0.14.0, lançada recentemente, esta função já foi incluída na biblioteca. Encontrar
nipyapi.canvas.set_remote_process_group_transmission
Assim, com a ajuda da biblioteca NiPyAPI, conectamos o registro, aumentamos o fluxo e até iniciamos os processadores e a transferência de dados. Então você pode pentear o código, adicionar todos os tipos de verificações, registros e pronto. Mas essa é uma história completamente diferente.
Das opções de automação que considerei, esta me pareceu a mais eficiente. Em primeiro lugar, este ainda é um código python, no qual você pode incorporar código de programa auxiliar e aproveitar todos os benefícios de uma linguagem de programação. Em segundo lugar, o projeto NiPyAPI está em desenvolvimento ativo e em caso de problemas você pode escrever para o desenvolvedor. Em terceiro lugar, o NiPyAPI ainda é uma ferramenta mais flexível para interagir com o NiFi na resolução de problemas complexos. Por exemplo, para determinar se as filas de mensagens estão atualmente vazias no fluxo e se é possível atualizar o grupo de processos.
Isso é tudo. Descrevi três abordagens para automatizar a entrega de fluxo em NiFi, as armadilhas que um desenvolvedor pode encontrar e forneci um código funcional para automatizar a entrega. Se você está tão interessado neste tópico quanto eu -
Fonte: habr.com