大家好! 这就是我们 Quarkus 系列的最后一篇文章! (顺便说一下,观看我们的网络研讨会
В
从0.17.0版本开始,
假设您已经在 OpenShift 平台上部署了 AMQ Online(如果没有,请参阅
首先,我们将创建一个 Quarkus 应用程序,它将是一个使用反应式消息传递的简单订单处理系统。 该应用程序将包括一个订单生成器,以固定的时间间隔将订单发送到消息队列,以及一个订单处理器,用于处理来自队列的消息并生成可在浏览器中查看的确认信息。
创建应用程序后,我们将向您展示如何将消息传递系统配置嵌入到应用程序中,并使用 AMQ Online 来配置我们在系统上所需的资源。
夸库斯应用程序
我们的 Quarkus 应用程序在 OpenShift 上运行,是该程序的修改版本
订单生成器
生成器只是每 5 秒单调地将不断增长的订单 ID 发送到“订单”地址。
@ApplicationScoped
public class OrderGenerator {
private int orderId = 1;
@Outgoing("orders")
public Flowable<Integer> generate() {
return Flowable.interval(5, TimeUnit.SECONDS)
.map(tick -> orderId++);
}
}
订单处理器
订单处理程序甚至更简单,它只是将确认 ID 返回到“确认”地址。
@ApplicationScoped
public class OrderProcessor {
@Incoming("orders")
@Outgoing("confirmations")
public Integer process(Integer order) {
// Идентификатор подтверждения равен удвоенному идентификатору заказа <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">
return order * 2;
}
}
确认资源
确认资源是一个 HTTP 端点,用于列出我们的应用程序生成的确认。
@Path("/confirmations")
public class ConfirmationResource {
@Inject
@Stream("confirmations") Publisher<Integer> orders;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<Integer> stream() {
return orders;
}
}
调整
要连接到 AMQ Online,我们的应用程序将需要一些配置数据,即:Quarkus 连接器配置、AMQP 端点信息和客户端凭据。 当然,最好将所有配置数据保存在一个地方,但我们会故意将它们分开以显示配置 Quarkus 应用程序的可能选项。
连接器
可以使用应用程序属性文件在编译时提供连接器配置:
mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.incoming.orders.connector=smallrye-amqp
为了简单起见,我们将仅使用消息队列作为“订单”地址。 我们应用程序中的“确认”地址将使用内存中的队列。
AMQP端点
在编译时,AMQP 端点的主机名和端口号是未知的,因此必须注入它们。 端点可以在 AMQ Online 创建的 configmap 中设置,因此我们将通过应用程序清单中的环境变量来定义它们:
spec:
template:
spec:
containers:
- env:
- name: AMQP_HOST
valueFrom:
configMapKeyRef:
name: quarkus-config
key: service.host
- name: AMQP_PORT
valueFrom:
configMapKeyRef:
name: quarkus-config
key: service.port.amqp
证书
服务帐户令牌可用于向 OpenShift 验证我们的应用程序。 为此,您必须首先创建一个自定义 ConfigSource,它将从 pod 的文件系统读取身份验证令牌:
public class MessagingCredentialsConfigSource implements ConfigSource {
private static final Set<String> propertyNames;
static {
propertyNames = new HashSet<>();
propertyNames.add("amqp-username");
propertyNames.add("amqp-password");
}
@Override
public Set<String> getPropertyNames() {
return propertyNames;
}
@Override
public Map<String, String> getProperties() {
try {
Map<String, String> properties = new HashMap<>();
properties.put("amqp-username", "@@serviceaccount@@");
properties.put("amqp-password", readTokenFromFile());
return properties;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public String getValue(String key) {
if ("amqp-username".equals(key)) {
return "@@serviceaccount@@";
}
if ("amqp-password".equals(key)) {
try {
return readTokenFromFile();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return null;
}
@Override
public String getName() {
return "messaging-credentials-config";
}
private static String readTokenFromFile() throws IOException {
return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);
}
}
构建和部署应用程序
由于应用程序必须编译成可执行文件,因此需要 GraalVM 虚拟机。 有关如何为此设置环境的详细信息,请参阅相应的说明
然后,按照那里给出的说明,您需要下载源代码,构建并部署我们的应用程序:
git clone https://github.com/EnMasseProject/enmasse-example-clients
cd enmasse-example-clients/quarkus-example-client
oc new-project myapp
mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply
在这些命令之后,应用程序将被部署,但直到我们在 AMQ Online 中配置所需的消息传递资源后才会启动。
设置消息系统
现在剩下的就是在消息系统中设置我们的应用程序所需的资源。 为此,您需要创建: 1) 一个地址空间来初始化消息系统端点; 2)地址配置我们在应用程序中使用的地址; 3) 向用户发送消息以设置客户端凭据。
地址空间
AMQ Online 中的 AddressSpace 对象是一组共享连接端点以及身份验证和授权策略的地址。 创建地址空间时,您可以指定如何公开消息传递端点:
apiVersion: enmasse.io/v1beta1
kind: AddressSpace
metadata:
name: quarkus-example
spec:
type: brokered
plan: brokered-single-broker
endpoints:
- name: messaging
service: messaging
exports:
- name: quarkus-config
kind: configmap
Адреса
地址用于发送和接收消息。 每个地址都有一个类型(决定其语义)和一个计划(指定要保留的资源数量)。 例如,可以这样确定地址:
apiVersion: enmasse.io/v1beta1
kind: Address
metadata:
name: quarkus-example.orders
spec:
address: orders
type: queue
plan: brokered-queue
消息传递用户
为了确保只有受信任的应用程序才能向您的地址发送和接收消息,您必须在消息传递系统中创建一个用户。 对于在集群上运行的应用程序,可以使用 OpenShift 服务帐户对客户端进行身份验证。 例如,可以像这样定义用户“serviceaccount”:
apiVersion: user.enmasse.io/v1beta1
kind: MessagingUser
metadata:
name: quarkus-example.app
spec:
username: system:serviceaccount:myapp:default
authentication:
type: serviceaccount
authorization:
- operations: ["send", "recv"]
addresses: ["orders"]
配置应用程序的权限
为了让 AMQ Online 创建我们用来嵌入 AMQP 端点信息的 configmap,必须设置 Role 和 RoleBinding:
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: quarkus-config
spec:
rules:
- apiGroups: [ "" ]
resources: [ "configmaps" ]
verbs: [ "create" ]
- apiGroups: [ "" ]
resources: [ "configmaps" ]
resourceNames: [ "quarkus-config" ]
verbs: [ "get", "update", "patch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: quarkus-config
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: quarkus-config
subjects:
- kind: ServiceAccount
name: address-space-controller
namespace: amq-online-infra
如何应用配置
您可以像这样应用消息系统配置:
cd enmasse-example-clients/quarkus-example-client
oc project myapp
oc apply -f src/main/resources/k8s/addressspace
oc apply -f src/main/resources/k8s/address
应用验证
为了确保应用程序已经启动,首先我们检查相应的地址是否已经创建并且处于活动状态:
until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done
然后我们查看一下应用路由URL(在浏览器中打开这个地址即可):
echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html"
浏览器应显示票证在 AMQ Online 发送和接收消息时定期更新。
总结
因此,我们编写了一个使用 AMQP 进行消息传递的 Quarkus 应用程序,将应用程序配置为在 Red Hat OpenShift 平台上运行,并基于 AMQ Online 配置实现了其配置。 然后,我们创建了初始化应用程序的消息系统所需的清单。
关于 Quarkus 的系列就到此结束了,但接下来还有很多新的有趣的事情,敬请期待!
来源: habr.com