Я только изучаю Quarkus и Reactive Messaging. Я пытаюсь сообщение между двумя компонентами. Примеры, которые я нашел, продемонстрировали потоки, которые имеют известный набор данных, который передается в потоковом режиме или непрерывно повторяет полезную нагрузку. (Например, из быстрого запуска Kafka, который непрерывно передает новое случайное число в качестве цены)
Мне нужно поместить событие в поток только тогда, когда в бизнес-логике происходят определенные события c. Есть ли примеры?
Я нашел этот пост в StackOverflow, Есть ли в Quarkus какая-либо функция для отправки сообщения Кафке . Однако есть две проблемы:
Я не могу заставить эту форму работать.
- Излучатель всегда равен нулю.
- Я пытаюсь сделать это чисто, используя Reactive Messaging без прокачки через Кафку из фона
ОБНОВЛЕНИЕ: @ iabugho sh
Спасибо. Но я все еще получаю Эмиттер, которому вводят ноль. Вот соответствующие фрагменты кода: mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
`public class Owner {
@Inject
@Channel("ownercreated")
private static Emitter<Owner> ownerCreatedChannel;
public void persist() {
Owner.ownerCreatedChannel.send(this);
}
}`
Я также добавил в качестве экземпляра var.
ОБНОВЛЕНИЕ # 2 по запросу @ iabugho sh - Спасибо за ваша помощь!
package org.boosey;
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class Owner {
private final Logger logger = Logger.getLogger(Owner.class.getName());
@Inject
@Channel("ownercreated")
private Emitter<Owner> ownerCreatedChannel;
public String name;
public String email;
public void persist() {
logger.info("IN PERSIST");
ownerCreatedChannel.send(this);
logger.info("SENT NEW OWNER");
}
}
application.properties:
mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
Метод Owner.persist вызывается из класса ресурсов REST Quarkus. Я убедился, что в Owner.persist получен правильно созданный объект Owner.
@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {
@POST
public Response create(Owner owner) {
owner.persist();
return Response.status(201).build();
}
}