Как ввести KafkaTemplate в Quarkus - PullRequest
0 голосов
/ 20 мая 2019

Я пытаюсь ввести KafkaTemplate для отправки одного сообщения. Я разрабатываю небольшую функцию, которая лежит за пределами реактивного подхода.

Я могу найти только примеры, которые используют @Ingoing и @Outgoing от Smallrye, но мне не нужен KafkaStream.

Я пытался с Kafka-CDI, но я не могу ввести SimpleKafkaProducer.

Есть идеи?

Для ответа Климента

Кажется, правильное направление, но при выполнении orders.send("hello"); я получаю эту ошибку:

(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected

Из моей темы я использую командную строку, Кафка запущена и работает, если я создаю вручную, я могу видеть использованные сообщения.

Кажется, относительно этого предложения от документа:

Чтобы использовать Emitter для потока hello, вам нужен @Incoming ("hello") где-то в вашем коде (или в вашей конфигурации).

У меня в классе есть этот код:

    @Incoming("orders")
    public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
        log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
        return msg.ack();
    }

Может быть, я забыл некоторые конфигурации?

1 Ответ

4 голосов
/ 20 мая 2019

Итак, вам просто нужно использовать Emitter:

@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;

// ...
orders.send("hello");

А в вашем application.properties объявите:

## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1

Чтобы исключить Stream not yet connected исключение, как предложено doc:

Чтобы использовать Emitter для потока hello, вам нужен @Incoming ("hello") где-то в вашем коде (или в вашей конфигурации).

Предполагая, что у вас есть что-то подобное в вашем приложении.properties:

# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id

Добавить что-то вроде этого:

@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
    log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
    return msg.ack();
}
...