Есть ли какая-либо функция в Quarkus для отправки сообщения Кафке? - PullRequest
1 голос
/ 11 октября 2019

Я новичок в kafka и quarkus, я хочу отправить сообщение в тему kafka после обработки запроса пользователя.

Я ознакомился с примером кафки, предоставленным в quarkus-quickstart. Я пытался с KafkaMessage

// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
    generateSingle();
    return "hello";
}

@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
    return KafkaMessage.of(1, "value");
};

Но я получил результат, что отправка сообщения в тему kafka постоянно.

Я хочу знать, есть ли другой метод или есть какие-либо проблемы с моим кодом,

Помощь оценена

1 Ответ

0 голосов
/ 17 октября 2019

Документация по этой теме является краткой и неполной в настоящее время (Quarkus 0.25.0). Мне удалось это сделать, но потребовалось много экспериментов, и я полагаю, что это взлом, который, надеюсь, будет исправлен в более поздних версиях Quarkus.

Принцип заключается в том, что метод @Outgoing должен производить поток , который контролируется внешне . Это достигается созданием потока через Flowable.create() в методе @PostConstruct и выставлением эмиттера члену класса. Метод @Outgoing просто возвращает уже созданный поток.

Следующий компонент предоставляет один открытый метод produce(String message), который отправит это текстовое сообщение Кафке:

package ...

import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;

@ApplicationScoped
public class KafkaController {

    private FlowableEmitter<KafkaMessage<String, String>> emitter;

    private Flowable<KafkaMessage<String, String>> outgoingStream;

    @PostConstruct
    void init() {
        outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
    }

    public void produce(String message) {
        emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
    }

    @PreDestroy
    void dispose() {
        emitter.onComplete();
    }

    @Outgoing("internal")
    Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
        return outgoingStream;
    }

    @Incoming("internal")
    @Outgoing("kafka-test")
    KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
        return arg.getPayload();
    }
}

Я создал этокласс в сгенерированном приложении Quarkus, как описано здесь :

mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"

И настроен (application.properties) следующим образом:

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Запущен экземпляр Kafkaв точности как описано в quickstart . Вы можете посмотреть тему test с помощью консольного прослушивателя следующим образом:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test --from-beginning --group test-console.consumer

Чтобы проверить его, вы можете создать ресурс JAX-RS для вызова produce():

package ...

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/control")
public class KafkaProduceControlResource {

    @Inject
    KafkaController kafkaController;

    @POST
    @Path("/produce")
    public void produceMessage(String message) {
        kafkaController.produce(message);
    }
}

Вызовите его из командной строки следующим образом и наблюдайте за потребителем консоли:

curl -i -s -X POST -d "A text message" \
    http://localhost:8080/control/produce

HACK: Кажется, что аннотировать produceKafkaMessage() с помощью @Outgoing("kafka-test") не удается, потому что Quarkus НЕ понимаетчто KafkaMessage равен a Message и оборачивает его в один, что приводит к ошибкам сериализации. Я обхожу это с потоком "internal".

...