Cloud-based messaging on the Red Hat OpenShift platform using Quarkus and AMQ Online

Hi all! Here it is - our final post from the Quarkus series! (By the way, watch our webinar "This is Quarkus - Kubernetes native Java framework". We will show you how to start from scratch or transfer ready-made solutions)

Cloud-based messaging on the Red Hat OpenShift platform using Quarkus and AMQ Online

Π’ previous In this post, we have looked at relevant tools that can be used to quantify the improvements resulting from modernizing Java applications.

Starting from version 0.17.0, quarkus supports the use of the Advanced Message Queuing Protocol (AMQP), which is an open standard for transferring business messages between applications or organizations.

Red Hat AMQ Online is a service built on the basis of an open source project EnMasse and implementing a messaging mechanism based on the platform Red Hat OpenShift. For more information on how it works, see here (EN). Today we'll show you how to combine AMQ Online and Quarkus to build a modern messaging system based on OpenShift using two new messaging technologies.

It is assumed that you have already deployed AMQ Online on the OpenShift platform (if not, see below). installation guide).

To begin, we will create a Quarkus application, which will be a simple order processing system using reactive messaging. This application will include an order generator that sends orders to a message queue at a fixed interval, as well as an order processor that will process messages from the queue and generate browser-viewable confirmations.

After creating the application, we will show how to inject the configuration of the messaging system into it and use AMQ Online to initialize the resources we need on this system.

Quarkus App

Our Quarkus application runs on OpenShift and is a modified version of the program amqp-quickstart. A complete client-side example can be found here.

Order Generator

The generator just monotonously sends growing order IDs to the "orders" address every 5 seconds.

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

Order Processor

The order handler is even simpler, it just returns a confirmation ID to the "confirmations" address.

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        // Π˜Π΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ подтвСрТдСния Ρ€Π°Π²Π΅Π½ ΡƒΠ΄Π²ΠΎΠ΅Π½Π½ΠΎΠΌΡƒ ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€Ρƒ Π·Π°ΠΊΠ°Π·Π° <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

Confirmation Resources

The assertion resource is an HTTP endpoint for listing the assertions generated by our application.

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

Setting

To connect to AMQ Online, our application will need some configuration data, namely: Quarkus connector configuration, AMQP endpoint information, and client credentials. It's better, of course, to keep all the configuration data in one place, but we will separate it specifically to show the possible options for configuring the Quarkus application.

Connectors

The connector configuration can be provided at compile time using the application properties file:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

To keep things simple, we will only use the message queue for the "orders" address. And the address "confirmations" in our application will use the queue in memory.

AMQP Endpoint

At compile time, the hostname and port number for the AMQP endpoint are not known, so they must be injected. The endpoint can be set in the configmap generated by AMQ Online, so we will define them via environment variables in the application manifest:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

Credentials

The service account token can be used to authenticate our application to OpenShift. To do this, you first need to create a custom ConfigSource that will read the authentication token from the pod's file system:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

Building and Deploying the Application

Since the application needs to be compiled into an executable file, the GraalVM virtual machine is required. For details on how to set up an environment for this, see the relevant instructions in Quarkus Guide.

Then, following the instructions given there, we need to download the source, build and deploy our application:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

After these commands, the application will be deployed, but will not start until we configure the messaging resources we need in AMQ Online.

Setting up the messaging system

Now it remains to set the resources that our application needs in the messaging system. To do this, you need to create: 1) an address space to initialize the endpoint of the messaging system; 2) address, to set up the addresses we use in the application; 3) the messaging system user to set the client's credentials.

Address space

An AddressSpace object in AMQ Online is a group of addresses that share connection endpoints and authentication and authorization policies. When creating an address space, you can specify how exactly the messaging endpoints will be exposed:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

Addresses

Addresses are used to send and receive messages. Each address has a type that defines its semantics, and a plan that specifies the amount of resources to reserve. The address can be specified, for example, like this:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

Messaging user

To ensure that only trusted applications can send and receive messages to your addresses, you need to create a user in the messaging system. For applications running on a cluster, clients can be authenticated using an OpenShift service account. The user "serviceaccount" can be defined, for example, like this:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

App Customization Permissions

In order for AMQ Online to create the configmap we used to inject the AMQP endpoint information, you need to set the role and the role binding (Role and RoleBinding):

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

How to apply configurations

You can apply the messaging system configuration like this:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

Application Verification

To make sure that the application has started, first of all, we will check if the corresponding addresses have been created and are active:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

Next, check the app's route URL (simply open it in a browser):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

The browser should be able to see that tickets are updated periodically as messages are sent and received by AMQ Online.

Summing up

So we've written a Quarkus application that uses AMQP for messaging, configured the application to run on the Red Hat OpenShift platform, and deployed its configuration based on the AMQ Online configuration. We then created the manifests needed to initialize the messaging system for our application.

This concludes the series about Quarkus, but there are many new and interesting things ahead, stay tuned!

Source: habr.com

Add a comment