Как отправить сообщение с ключом в Kafka, используя Spring Cloud Stream Supplier - PullRequest
1 голос
/ 14 февраля 2020

Я хочу использовать Spring Cloud Stream для создания сообщений с ключами (сообщения с указанным c ключом) для Kafka.

@SpringBootApplication
public class SpringCloudStreamKafkaApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
  }

  @Bean
  Supplier<DataRecord> process(){
    return () -> new DataRecord(42L);
  }

}

Что мне нужно изменить в коде поставщика для предоставления ключа? Возможно ли это в новом стиле API (используя лямбды)?

Спасибо

1 Ответ

1 голос
/ 14 февраля 2020

Возвращает Message<?> и устанавливает заголовок KafkaHeaders.MESSAGE_KEY:

@Bean
Supplier<Message<String>> process() {
    return () -> MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
}

(предполагается, что сериализатор ключей по умолчанию (байт []).

РЕДАКТИРОВАТЬ

Это будет вызываться бесконечно.

Если вы хотите отправить конечный поток, я считаю, что вам нужно переключиться на реактивную модель.

@Bean
Supplier<Flux<Message<String>>> processFinite() {
    Message<String> msg1 = MessageBuilder.withPayload("foo")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
            .build();
    Message<String> msg2 = MessageBuilder.withPayload("baz")
            .setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
            .build();
    return () -> {
        return Flux.just(msg1, msg2);
    };
}

Существует также Flux.fromStream(myStream).

Который закончится в конце потока.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...