使用 Quarkus 和 AMQ Online 在红帽 OpenShift 平台上进行云原生消息传递

大家好! 这就是我们 Quarkus 系列的最后一篇文章! (顺便说一下,观看我们的网络研讨会 “这是 Quarkus – Kubernetes 原生 Java 框架”。 我们将向您展示如何从头开始或转移现成的解决方案)

使用 Quarkus 和 AMQ Online 在红帽 OpenShift 平台上进行云原生消息传递

В 以前的 在这篇文章中,我们研究了可用于量化 Java 应用程序现代化所带来的改进的相关工具。

从0.17.0版本开始, 夸库斯 支持使用高级消息队列协议(空气质量计划),这是一个用于在应用程序或组织之间传输业务消息的开放标准。

红帽 AMQ 在线 是一个基于开源项目构建的服务 EnMasse 并实现基于平台的消息传递机制 红帽OpenShift。 有关其工作原理的更多详细信息,请参阅 这里 (EN)。 今天,我们将向您展示如何结合 AMQ Online 和 Quarkus,使用两种新的消息传递技术构建基于 OpenShift 的现代消息传递系统。

假设您已经在 OpenShift 平台上部署了 AMQ Online(如果没有,请参阅 安装指南).

首先,我们将创建一个 Quarkus 应用程序,它将是一个使用反应式消息传递的简单订单处理系统。 该应用程序将包括一个订单生成器,以固定的时间间隔将订单发送到消息队列,以及一个订单处理器,用于处理来自队列的消息并生成可在浏览器中查看的确认信息。

创建应用程序后,我们将向您展示如何将消息传递系统配置嵌入到应用程序中,并使用 AMQ Online 来配置我们在系统上所需的资源。

夸库斯应用程序

我们的 Quarkus 应用程序在 OpenShift 上运行,是该程序的修改版本 amqp-快速入门。 可以找到客户端的完整示例 这里.

订单生成器

生成器只是每 5 秒单调地将不断增长的订单 ID 发送到“订单”地址。

@ApplicationScoped
public class OrderGenerator {
 
    private int orderId = 1;
 
    @Outgoing("orders")
    public Flowable<Integer> generate() {
        return Flowable.interval(5, TimeUnit.SECONDS)
        .map(tick -> orderId++);
    }
}

订单处理器

订单处理程序甚至更简单,它只是将确认 ID 返回到“确认”地址。

@ApplicationScoped
public class OrderProcessor {
    @Incoming("orders")
    @Outgoing("confirmations")
    public Integer process(Integer order) {
        // Идентификатор подтверждения равен удвоенному идентификатору заказа <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
        return order * 2;
    }
}

确认资源

确认资源是一个 HTTP 端点,用于列出我们的应用程序生成的确认。

@Path("/confirmations")
public class ConfirmationResource {
 
    @Inject
    @Stream("confirmations") Publisher<Integer> orders;
 
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }
 
 
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> stream() {
        return orders;
    }
}

调整

要连接到 AMQ Online,我们的应用程序将需要一些配置数据,即:Quarkus 连接器配置、AMQP 端点信息和客户端凭据。 当然,最好将所有配置数据保存在一个地方,但我们会故意将它们分开以显示配置 Quarkus 应用程序的可能选项。

连接器

可以使用应用程序属性文件在编译时提供连接器配置:

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp

为了简单起见,我们将仅使用消息队列作为“订单”地址。 我们应用程序中的“确认”地址将使用内存中的队列。

AMQP端点

在编译时,AMQP 端点的主机名和端口号是未知的,因此必须注入它们。 端点可以在 AMQ Online 创建的 configmap 中设置,因此我们将通过应用程序清单中的环境变量来定义它们:

spec:
  template:
    spec:
      containers:
      - env:
        - name: AMQP_HOST
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.host
        - name: AMQP_PORT
          valueFrom:
            configMapKeyRef:
              name: quarkus-config
              key: service.port.amqp

证书

服务帐户令牌可用于向 OpenShift 验证我们的应用程序。 为此,您必须首先创建一个自定义 ConfigSource,它将从 pod 的文件系统读取身份验证令牌:

public class MessagingCredentialsConfigSource implements ConfigSource {
    private static final Set<String> propertyNames;
 
    static {
        propertyNames = new HashSet<>();
        propertyNames.add("amqp-username");
        propertyNames.add("amqp-password");
    }
 
    @Override
    public Set<String> getPropertyNames() {
        return propertyNames;
    }
 
    @Override
    public Map<String, String> getProperties() {
        try {
            Map<String, String> properties = new HashMap<>();
            properties.put("amqp-username", "@@serviceaccount@@");
            properties.put("amqp-password", readTokenFromFile());
            return properties;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
 
    @Override
    public String getValue(String key) {
        if ("amqp-username".equals(key)) {
            return "@@serviceaccount@@";
        }
        if ("amqp-password".equals(key)) {
            try {
                return readTokenFromFile();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        return null;
    }
 
    @Override
    public String getName() {
        return "messaging-credentials-config";
    }
 
    private static String readTokenFromFile() throws IOException {
        return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
    }
}

构建和部署应用程序

由于应用程序必须编译成可执行文件,因此需要 GraalVM 虚拟机。 有关如何为此设置环境的详细信息,请参阅相应的说明 夸库斯指南.

然后,按照那里给出的说明,您需要下载源代码,构建并部署我们的应用程序:

git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply

在这些命令之后,应用程序将被部署,但直到我们在 AMQ Online 中配置所需的消息传递资源后才会启动。

设置消息系统

现在剩下的就是在消息系统中设置我们的应用程序所需的资源。 为此,您需要创建: 1) 一个地址空间来初始化消息系统端点; 2)地址配置我们在应用程序中使用的地址; 3) 向用户发送消息以设置客户端凭据。

地址空间

AMQ Online 中的 AddressSpace 对象是一组共享连接端点以及身份验证和授权策略的地址。 创建地址空间时,您可以指定如何公开消息传递端点:

apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
  name: quarkus-example
spec:
  type: brokered
  plan: brokered-single-broker
  endpoints:
  - name: messaging
    service: messaging
    exports:
    - name: quarkus-config
      kind: configmap

Адреса

地址用于发送和接收消息。 每个地址都有一个类型(决定其语义)和一个计划(指定要保留的资源数量)。 例如,可以这样确定地址:

apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
  name: quarkus-example.orders
spec:
  address: orders
  type: queue
  plan: brokered-queue

消息传递用户

为了确保只有受信任的应用程序才能向您的地址发送和接收消息,您必须在消息传递系统中创建一个用户。 对于在集群上运行的应用程序,可以使用 OpenShift 服务帐户对客户端进行身份验证。 例如,可以像这样定义用户“serviceaccount”:

apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
  name: quarkus-example.app
spec:
  username: system:serviceaccount:myapp:default
  authentication:
    type: serviceaccount
  authorization:
  - operations: ["send", "recv"]
    addresses: ["orders"]

配置应用程序的权限

为了让 AMQ Online 创建我们用来嵌入 AMQP 端点信息的 configmap,必须设置 Role 和 RoleBinding:

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: quarkus-config
spec:
  rules:
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    verbs: [ "create" ]
  - apiGroups: [ "" ]
    resources: [ "configmaps" ]
    resourceNames: [ "quarkus-config" ]
    verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: quarkus-config
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: quarkus-config
subjects:
- kind: ServiceAccount
  name: address-space-controller
  namespace: amq-online-infra

如何应用配置

您可以像这样应用消息系统配置:

cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address

应用验证

为了确保应用程序已经启动,首先我们检查相应的地址是否已经创建并且处于活动状态:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done

然后我们查看一下应用路由URL(在浏览器中打开这个地址即可):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"

浏览器应显示票证在 AMQ Online 发送和接收消息时定期更新。

总结

因此,我们编写了一个使用 AMQP 进行消息传递的 Quarkus 应用程序,将应用程序配置为在 Red Hat OpenShift 平台上运行,并基于 AMQ Online 配置实现了其配置。 然后,我们创建了初始化应用程序的消息系统所需的清单。

关于 Quarkus 的系列就到此结束了,但接下来还有很多新的有趣的事情,敬请期待!

来源: habr.com

添加评论