Я получаю данные из списка URL-адресов из конечной точки REST в Flux,
Flux.fromIterable(sensorUrls)
.publishOn(Schedulers.boundedElastic())
.map(url -> webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(Object.class)
.retry()
.subscribe(this::sendMessage)
).subscribe();
Метод отправки сообщения должен отправлять данные в систему Kafka.
Моя конфигурация Kafka такая ниже
public static KafkaSender<String, String> createKafkaSender(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
return KafkaSender.create(senderOptions);
}
и метод отправки сообщения:
private void sendMessage(Object data) {
Flux<ProducerRecord<Integer, String>> flux = Flux.just(new ProducerRecord<>("test-topic", 2, gson.toJson(data)));
kafkaSender.createOutbound()
.sendTransactionally(flux) // ISSUE
.then()
.doOnError(a -> logger.error("error"))
.doOnSuccess(a -> logger.error("success"));
}
}
Ошибка компилятора показывает мне:
Required type: Publisher <? extends Publisher<? extends ProducerRecord<String, String>>>
Provided: Flux<ProducerRecord<Integer, String>>
Любое предложение или помощь по преобразованию в тип издателя чтобы заставить его работать?