Hi all!
The task is as follows - there is a flow shown in the picture above, which needs to be rolled out to N servers with
NiFi Site to Site (S2S) is a secure, highly customizable way to transfer data between NiFi instances. See how S2S works
When it comes to data transfer using S2S, one instance is called a client, the second is a server. The client sends data, the server receives it. Two ways to set up data transfer between them:
- Push. Data is sent from the client instance using a Remote Process Group (RPG). On the server instance, data is received using the Input Port
- Pullover. The server receives data using the RPG, the client sends using the Output port.
Flow for rolling is stored in the Apache Registry.
Apache NiFi Registry is a subproject of Apache NiFi that provides a flow storage and versioning tool. A sort of GIT. Information about installing, configuring and working with the registry can be found in
At the start, when N is a small number, the flow is delivered and updated by hand in a reasonable time.
But as N grows, there are more problems:
- it takes more time to update the flow. You need to go to all servers
- there are errors updating templates. Here they updated, but here they forgot
- human error when performing a large number of similar operations
All this brings us to the fact that it is necessary to automate the process. I have tried the following ways to solve this problem:
- Use MiNiFi instead of NiFi
- NiFi CLI
- NiPyAPI
Using MiNiFi
Another subproject, MiNiFi C2 Server, will help solve this problem. This product is intended to be the central point in the deployment architecture. How to configure the environment - described in
The option described in the article above is working and not difficult to implement, but we must not forget the following:
- minifi does not have all processors from nifi
- CPU versions in Minifi lag behind CPU versions in NiFi.
At the time of writing, the latest version of NiFi is 1.9.2. The processor version of the latest MiNiFi version is 1.7.0. Processors can be added to MiNiFi, but due to version discrepancies between NiFi and MiNiFi processors, this may not work.
NiFi CLI
Judging by
Run the utility
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
In order for us to load the necessary flow from the registry, we need to know the identifiers of the basket (bucket identifier) ββand the flow itself (flow identifier). This data can be obtained either through the cli or in the NiFi registry web interface. The web interface looks like this:
Using the CLI, you do this:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Run import process group from registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
An important point is that any nifi instance can be specified as the host on which we roll the process group.
Process group added with stopped processors, they need to be started
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Great, the processors have started. However, according to the conditions of the problem, we need NiFi instances to send data to other instances. Let's assume that the Push method was chosen to transfer data to the server. In order to organize data transfer, it is necessary to enable data transfer (Enable transmitting) on ββthe added Remote Process Group (RPG), which is already included in our flow.
In the documentation in the CLI and other sources, I did not find a way to enable data transfer. If you know how to do this, please write in the comments.
Since we have bash and we are ready to go to the end, we will find a way out! You can use the NiFi API to solve this problem. Let's use the following method, we take the ID from the examples above (in our case it is 7f522a13-016e-1000-e504-d5b15587f2f3). Description of NiFi API Methods
In the body, you need to pass JSON, of the following form:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameters that must be filled in order to βworkβ:
state β data transfer status. Available TRANSMITTING to enable data transfer, STOPPED to disable
version - processor version
version will default to 0 when created, but these parameters can be obtained using the method
For lovers of bash scripts, this method may seem suitable, but it's hard for me - bash scripts are not my favorite. The next way is more interesting and more convenient in my opinion.
NiPyAPI
NiPyAPI is a Python library for interacting with NiFi instances.
Our script for rolling out the configuration is a Python program. Let's move on to coding.
Set up configs for further work. We will need the following parameters:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-api ΠΈΠ½ΡΡΠ°Π½ΡΠ°, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ ΡΠ°Π·Π²ΠΎΡΠ°ΡΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡΠ΄Π΅Ρ Π½Π°Π·ΡΠ²Π°ΡΡΡΡ Π² ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ
Further I will insert the names of the methods of this library, which are described
We connect the registry to the nifi instance using
nipyapi.versioning.create_registry_client
At this step, you can also add a check that the registry has already been added to the instance, for this you can use the method
nipyapi.versioning.list_registry_clients
We find the bucket to further search for the flow in the basket
nipyapi.versioning.get_registry_bucket
According to the found bucket, we are looking for flow
nipyapi.versioning.get_flow_in_bucket
Next, it is important to understand if this process group has already been added. The process group is placed by coordinates and a situation may arise when a second one is superimposed on top of one. I checked, it can be π To get all the added process group, use the method
nipyapi.canvas.list_all_process_groups
and then we can search, for example, by name.
I will not describe the process of updating the template, I will only say that if processors are added in the new version of the template, then there are no problems with the presence of messages in the queues. But if the processors are removed, then problems may arise (nifi does not allow the removal of the processor if a message queue has accumulated in front of it). If you are interested in how I solved this problem - write to me, please, we will discuss this point. Contacts at the end of the article. Let's move on to the step of adding a process group.
When debugging the script, I came across a feature that the latest version of flow is not always pulled up, so I recommend that you first clarify this version:
nipyapi.versioning.get_latest_flow_ver
Deploy process group:
nipyapi.versioning.deploy_flow_version
We start the processors:
nipyapi.canvas.schedule_process_group
In the block about CLI, it was written that data transfer is not automatically enabled in the remote process group? When implementing the script, I encountered this problem too. At that time, I couldnβt start data transfer using the API and I decided to write to the developer of the NiPyAPI library and ask for advice / help. The developer answered me, we discussed the problem and he wrote that he needed time to βcheck somethingβ. And now, a couple of days later, an email arrives in which a Python function is written that solves my startup problem !!! At that time, the NiPyAPI version was 0.13.3 and, of course, there was nothing of the kind in it. But in version 0.14.0, which was released quite recently, this function has already been included in the library. Meet
nipyapi.canvas.set_remote_process_group_transmission
So, with the help of the NiPyAPI library, we connected the registry, rolled up the flow, and even started the processors and data transfer. Then you can comb the code, add all kinds of checks, logging, and that's it. But that's a completely different story.
Of the automation options I considered, the latter seemed to me the most efficient. Firstly, this is still python code, in which you can embed auxiliary program code and enjoy all the benefits of a programming language. Secondly, the NiPyAPI project is actively developing and in case of problems you can write to the developer. Thirdly, NiPyAPI is still a more flexible tool for interacting with NiFi in solving complex problems. For example, in determining whether the message queues are currently empty in the flow and whether it is possible to update the process group.
That's all. I described 3 approaches to automating flow delivery in NiFi, the pitfalls that a developer may encounter and provided a working code for automating delivery. If you are just as interested in this topic as I am -
Source: habr.com