Отправка ответа Flux в Kafka - PullRequest
       72

Отправка ответа Flux в Kafka

0 голосов
/ 05 августа 2020

Я получаю данные из списка URL-адресов из конечной точки REST в Flux,

 Flux.fromIterable(sensorUrls)
                .publishOn(Schedulers.boundedElastic())
                .map(url -> webClient.get()
                        .uri(url)
                        .retrieve()
                        .bodyToFlux(Object.class)
                        .retry()
                        .subscribe(this::sendMessage)
                ).subscribe();

Метод отправки сообщения должен отправлять данные в систему Kafka.

Моя конфигурация Kafka такая ниже

 public static KafkaSender<String, String> createKafkaSender(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);
        return KafkaSender.create(senderOptions);
    }

и метод отправки сообщения:

 private void sendMessage(Object data) {
        Flux<ProducerRecord<Integer, String>> flux = Flux.just(new ProducerRecord<>("test-topic", 2, gson.toJson(data)));
        kafkaSender.createOutbound()
                .sendTransactionally(flux) // ISSUE
                .then()
                .doOnError(a -> logger.error("error"))
                .doOnSuccess(a -> logger.error("success"));
        }
    }

Ошибка компилятора показывает мне:

Required type: Publisher <? extends Publisher<? extends ProducerRecord<String, String>>>
Provided: Flux<ProducerRecord<Integer, String>>

Любое предложение или помощь по преобразованию в тип издателя чтобы заставить его работать?

...