Flow Delivery Automation in Apache NiFi

Hi all!

Flow Delivery Automation in Apache NiFi

The task is as follows - there is a flow shown in the picture above, which needs to be rolled out to N servers with Apache NiFi. Flow test - a file is being generated and sent to another NiFi instance. Data transfer occurs using the NiFi Site to Site protocol.

NiFi Site to Site (S2S) is a secure, highly customizable way to transfer data between NiFi instances. See how S2S works documentation and it's important to remember to set up your NiFi instance to allow S2S see here.

When it comes to data transfer using S2S, one instance is called a client, the second is a server. The client sends data, the server receives it. Two ways to set up data transfer between them:

  1. Push. Data is sent from the client instance using a Remote Process Group (RPG). On the server instance, data is received using the Input Port
  2. Pullover. The server receives data using the RPG, the client sends using the Output port.


Flow for rolling is stored in the Apache Registry.

Apache NiFi Registry is a subproject of Apache NiFi that provides a flow storage and versioning tool. A sort of GIT. Information about installing, configuring and working with the registry can be found in official documentation. Flow for storage is combined into a process group and stored in the registry in this form. We will return to this later in the article.

At the start, when N is a small number, the flow is delivered and updated by hand in a reasonable time.

But as N grows, there are more problems:

  1. it takes more time to update the flow. You need to go to all servers
  2. there are errors updating templates. Here they updated, but here they forgot
  3. human error when performing a large number of similar operations

All this brings us to the fact that it is necessary to automate the process. I have tried the following ways to solve this problem:

  1. Use MiNiFi instead of NiFi
  2. NiFi CLI
  3. NiPyAPI

Using MiNiFi

ApacheMiNify is a subproject of Apache NiFi. MiNiFy is a compact agent that uses the same processors as NiFi, allowing you to create the same flow as in NiFi. The lightness of the agent is achieved, among other things, due to the fact that MiNiFy does not have a graphical interface for the flow configuration. MiNiFy's lack of a graphical interface means that it is necessary to solve the problem of flow delivery in minifi. Since MiNiFy is actively used in IOT, there are many components and the process of delivering flow to final minifi instances must be automated. A familiar task, right?

Another subproject, MiNiFi C2 Server, will help solve this problem. This product is intended to be the central point in the deployment architecture. How to configure the environment - described in this article on HabrΓ© and the information is enough to solve the problem. MiNiFi in conjunction with the C2 server automatically updates its configuration. The only drawback of this approach is that you have to create templates on the C2 Server, a simple commit to the registry is not enough.

The option described in the article above is working and not difficult to implement, but we must not forget the following:

  1. minifi does not have all processors from nifi
  2. CPU versions in Minifi lag behind CPU versions in NiFi.

At the time of writing, the latest version of NiFi is 1.9.2. The processor version of the latest MiNiFi version is 1.7.0. Processors can be added to MiNiFi, but due to version discrepancies between NiFi and MiNiFi processors, this may not work.

NiFi CLI

Judging by description tool on the official website, this is a tool for automating the interaction between NiFI and NiFi Registry in the field of flow delivery or process management. Download this tool to get started. hence.

Run the utility

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

In order for us to load the necessary flow from the registry, we need to know the identifiers of the basket (bucket identifier) ​​and the flow itself (flow identifier). This data can be obtained either through the cli or in the NiFi registry web interface. The web interface looks like this:

Flow Delivery Automation in Apache NiFi

Using the CLI, you do this:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Run import process group from registry:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

An important point is that any nifi instance can be specified as the host on which we roll the process group.

Process group added with stopped processors, they need to be started

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Great, the processors have started. However, according to the conditions of the problem, we need NiFi instances to send data to other instances. Let's assume that the Push method was chosen to transfer data to the server. In order to organize data transfer, it is necessary to enable data transfer (Enable transmitting) on ​​the added Remote Process Group (RPG), which is already included in our flow.

Flow Delivery Automation in Apache NiFi

In the documentation in the CLI and other sources, I did not find a way to enable data transfer. If you know how to do this, please write in the comments.

Since we have bash and we are ready to go to the end, we will find a way out! You can use the NiFi API to solve this problem. Let's use the following method, we take the ID from the examples above (in our case it is 7f522a13-016e-1000-e504-d5b15587f2f3). Description of NiFi API Methods here.

Flow Delivery Automation in Apache NiFi
In the body, you need to pass JSON, of the following form:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameters that must be filled in order to β€œwork”:
state β€” data transfer status. Available TRANSMITTING to enable data transfer, STOPPED to disable
version - processor version

version will default to 0 when created, but these parameters can be obtained using the method

Flow Delivery Automation in Apache NiFi

For lovers of bash scripts, this method may seem suitable, but it's hard for me - bash scripts are not my favorite. The next way is more interesting and more convenient in my opinion.

NiPyAPI

NiPyAPI is a Python library for interacting with NiFi instances. Documentation page contains the necessary information to work with the library. Quick start is described in project on github.

Our script for rolling out the configuration is a Python program. Let's move on to coding.
Set up configs for further work. We will need the following parameters:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-api инстанса, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Π·Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² инстансС nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ подтягиваСм flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ подтягиваСм

Further I will insert the names of the methods of this library, which are described here.

We connect the registry to the nifi instance using

nipyapi.versioning.create_registry_client

At this step, you can also add a check that the registry has already been added to the instance, for this you can use the method

nipyapi.versioning.list_registry_clients

We find the bucket to further search for the flow in the basket

nipyapi.versioning.get_registry_bucket

According to the found bucket, we are looking for flow

nipyapi.versioning.get_flow_in_bucket

Next, it is important to understand if this process group has already been added. The process group is placed by coordinates and a situation may arise when a second one is superimposed on top of one. I checked, it can be πŸ™‚ To get all the added process group, use the method

nipyapi.canvas.list_all_process_groups

and then we can search, for example, by name.

I will not describe the process of updating the template, I will only say that if processors are added in the new version of the template, then there are no problems with the presence of messages in the queues. But if the processors are removed, then problems may arise (nifi does not allow the removal of the processor if a message queue has accumulated in front of it). If you are interested in how I solved this problem - write to me, please, we will discuss this point. Contacts at the end of the article. Let's move on to the step of adding a process group.

When debugging the script, I came across a feature that the latest version of flow is not always pulled up, so I recommend that you first clarify this version:

nipyapi.versioning.get_latest_flow_ver

Deploy process group:

nipyapi.versioning.deploy_flow_version

We start the processors:

nipyapi.canvas.schedule_process_group

In the block about CLI, it was written that data transfer is not automatically enabled in the remote process group? When implementing the script, I encountered this problem too. At that time, I couldn’t start data transfer using the API and I decided to write to the developer of the NiPyAPI library and ask for advice / help. The developer answered me, we discussed the problem and he wrote that he needed time to β€œcheck something”. And now, a couple of days later, an email arrives in which a Python function is written that solves my startup problem !!! At that time, the NiPyAPI version was 0.13.3 and, of course, there was nothing of the kind in it. But in version 0.14.0, which was released quite recently, this function has already been included in the library. Meet

nipyapi.canvas.set_remote_process_group_transmission

So, with the help of the NiPyAPI library, we connected the registry, rolled up the flow, and even started the processors and data transfer. Then you can comb the code, add all kinds of checks, logging, and that's it. But that's a completely different story.

Of the automation options I considered, the latter seemed to me the most efficient. Firstly, this is still python code, in which you can embed auxiliary program code and enjoy all the benefits of a programming language. Secondly, the NiPyAPI project is actively developing and in case of problems you can write to the developer. Thirdly, NiPyAPI is still a more flexible tool for interacting with NiFi in solving complex problems. For example, in determining whether the message queues are currently empty in the flow and whether it is possible to update the process group.

That's all. I described 3 approaches to automating flow delivery in NiFi, the pitfalls that a developer may encounter and provided a working code for automating delivery. If you are just as interested in this topic as I am - write!

Source: habr.com

Add a comment