Реактивный обмен сообщениями: генерировать события при необходимости (используя Kafka) - PullRequest
0 голосов
/ 23 января 2020

Я только изучаю Quarkus и Reactive Messaging. Я пытаюсь сообщение между двумя компонентами. Примеры, которые я нашел, продемонстрировали потоки, которые имеют известный набор данных, который передается в потоковом режиме или непрерывно повторяет полезную нагрузку. (Например, из быстрого запуска Kafka, который непрерывно передает новое случайное число в качестве цены)

Мне нужно поместить событие в поток только тогда, когда в бизнес-логике происходят определенные события c. Есть ли примеры?

Я нашел этот пост в StackOverflow, Есть ли в Quarkus какая-либо функция для отправки сообщения Кафке . Однако есть две проблемы:

Я не могу заставить эту форму работать.

  1. Излучатель всегда равен нулю.
  2. Я пытаюсь сделать это чисто, используя Reactive Messaging без прокачки через Кафку из фона

ОБНОВЛЕНИЕ: @ iabugho sh

Спасибо. Но я все еще получаю Эмиттер, которому вводят ноль. Вот соответствующие фрагменты кода: mp.messaging.outgoing.ownercreated.connector=smallrye-kafka mp.messaging.outgoing.ownercreated.topic=ownercreated mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

`public class Owner  {

    @Inject
    @Channel("ownercreated")
    private static Emitter<Owner> ownerCreatedChannel;

    public void persist() {
        Owner.ownerCreatedChannel.send(this);
    }
}`

Я также добавил в качестве экземпляра var.

ОБНОВЛЕНИЕ # 2 по запросу @ iabugho sh - Спасибо за ваша помощь!

package org.boosey;

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import java.util.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class Owner {
    private final Logger logger = Logger.getLogger(Owner.class.getName());

    @Inject
    @Channel("ownercreated")
    private Emitter<Owner> ownerCreatedChannel;

    public String name;
    public String email;

    public void persist() {
        logger.info("IN PERSIST");

        ownerCreatedChannel.send(this);

        logger.info("SENT NEW OWNER");
    }
}

application.properties:

mp.messaging.outgoing.ownercreated.connector=smallrye-kafka
mp.messaging.outgoing.ownercreated.topic=ownercreated
mp.messaging.outgoing.ownercreated.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Метод Owner.persist вызывается из класса ресурсов REST Quarkus. Я убедился, что в Owner.persist получен правильно созданный объект Owner.

@Path("/owner")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApplicationScoped
public class OwnerResource {

    @POST
    public Response create(Owner owner) {
        owner.persist();
        return Response.status(201).build();
    }
}

1 Ответ

1 голос
/ 23 января 2020

Если у вас правильно настроен outgoing topi c в файле application.properties, все, что вам нужно сделать, это вставить Emitter следующим образом:

@Inject
@Channel("your-channel")
Emitter<String> outgoingChannel;

и в вашей функции вы можете вызвать :

outgoingChannel.send(msg);

, где ваш -канал будет выглядеть так в конфигурационном файле:

mp.messaging.outgoing.your-channel.topic=kafka-topic

Обновление: переместите код эмиттера (вместе с аннотациями) в OwnerResource, он должен работать без сбоев. Также вы можете удалить @ApplicationScoped из Owner, если вы переместили этот код. Здесь происходит то, что объект Owner не создается CDI, поэтому он не вводит другие объекты. Привет.

...