реактивный потребитель кафки с веб-клиентом - без противодавления - PullRequest
0 голосов
/ 06 февраля 2020

У меня есть приложение Spring Boot, использующее Reactor Kafka Consumer. Использованные сообщения должны быть отправлены на веб-сервис. Каждый HTTP-запрос должен содержать х сообщений. После того, как веб-служба отправляет HTTP-ответ, я хочу отправить следующий запрос со следующими x сообщениями.

Я использую Flux.window(int size), чтобы принять x сообщений и отправить с использованием Flux.flatMap(). В flatMap () я использую Spring WebClient.

В настоящее время windows перекрываются. Это приводит к отправке слишком большого количества запросов, что слишком много для веб-службы.

Это мой код:

@Service
public class KafkaConsumerService {
    private final ReceiverOptions<String, String> receiverOptions;
    @Value("${baseurl}")
    private String baseUrl;

    @Value("${windowSize}")
    private int windowSize;

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
    private final WebClient webClient;

    private Disposable stream;

    @Autowired
    public KafkaConsumerService(ReceiverOptions<String, String> receiverOptions) {
        this.receiverOptions = receiverOptions;
        this.webClient = WebClient
                .builder()
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_STREAM_JSON_VALUE)
                .build();
    }

    @PostConstruct
    public void startPipeline() {
        KafkaReceiver.create(receiverOptions)
                .receiveAutoAck()
                .concatMap(r -> r)
                .map(ConsumerRecord::value)
                .window(windowSize)
                .flatMap(this::sendToWebservice)
                .subscribe();
    }

    private Publisher<String> sendToWebservice(Flux<String> windowFlux) {
        LOGGER.info("===== sending...");
        return webClient
                .post()
                .uri(URI.create(baseUrl + "items"))
                .contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(windowFlux, String.class)
                .retrieve()
                .bodyToMono(String.class)
                .retryWhen(Retry.any()
                        .exponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
                        .retryMax(15))
                .doOnError((error) -> LOGGER.error("===== ERROR in sending request"))
                .doOnSuccess((x) -> LOGGER.info("===== Success"));
    }
}

Текущие выходные данные выглядят, но это не желаемый результат:

===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== sending...
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success
===== Success

Как заставить окно ждать завершения предыдущего окна? Разве не должно быть обратного давления от подписки ответа веб-клиента, что замедляет работу издателя (= получателя kafka)? Является ли окно правильным оператором или есть что-то еще, что я должен использовать?

...