У меня есть приложение Spring Boot, использующее Reactor Kafka Consumer. Использованные сообщения должны быть отправлены на веб-сервис. Каждый HTTP-запрос должен содержать х сообщений. После того, как веб-служба отправляет HTTP-ответ, я хочу отправить следующий запрос со следующими x сообщениями.
Я использую Flux.window(int size)
, чтобы принять x сообщений и отправить с использованием Flux.flatMap()
. В flatMap () я использую Spring WebClient.
В настоящее время windows перекрываются. Это приводит к отправке слишком большого количества запросов, что слишком много для веб-службы.
Это мой код:
@Service
public class KafkaConsumerService {
private final ReceiverOptions<String, String> receiverOptions;
@Value("${baseurl}")
private String baseUrl;
@Value("${windowSize}")
private int windowSize;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
private final WebClient webClient;
private Disposable stream;
@Autowired
public KafkaConsumerService(ReceiverOptions<String, String> receiverOptions) {
this.receiverOptions = receiverOptions;
this.webClient = WebClient
.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_STREAM_JSON_VALUE)
.build();
}
@PostConstruct
public void startPipeline() {
KafkaReceiver.create(receiverOptions)
.receiveAutoAck()
.concatMap(r -> r)
.map(ConsumerRecord::value)
.window(windowSize)
.flatMap(this::sendToWebservice)
.subscribe();
}
private Publisher<String> sendToWebservice(Flux<String> windowFlux) {
LOGGER.info("===== sending...");
return webClient
.post()
.uri(URI.create(baseUrl + "items"))
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(windowFlux, String.class)
.retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.any()
.exponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.retryMax(15))
.doOnError((error) -> LOGGER.error("===== ERROR in sending request"))
.doOnSuccess((x) -> LOGGER.info("===== Success"));
}
}
Текущие выходные данные выглядят, но это не желаемый результат:
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
Как заставить окно ждать завершения предыдущего окна? Разве не должно быть обратного давления от подписки ответа веб-клиента, что замедляет работу издателя (= получателя kafka)? Является ли окно правильным оператором или есть что-то еще, что я должен использовать?