Бесконечный поток и массовая запись в базу данных - PullRequest
0 голосов
/ 18 февраля 2019

У меня есть бесконечный поток (от kafka, использующего реактор-kafka) событий, которые я пытаюсь записать в виде пакетов в базу данных, прежде чем перейти к фактической обработке событий.Моя проблема состоит в том, чтобы заставить это работать с надлежащим противодавлением.

windowTimeout и bufferTimeout показались хорошими кандидатами, поскольку они позволили мне указать как максимальный размер, так и ограничить время ожидания в случае низкого «трафика».

Сначалаoff был windowTimeout, из которого массовые записи были сделаны в БД.Это, однако, быстро оказалось проблематичным: реактор.кор. Исключения $ OverflowException: приемник переполнен большим количеством сигналов, чем ожидалось (ограниченная очередь ...)) .

Затем я переключаюсь наbufferTimeout, но не удалось с ошибкой processor.core.Exceptions $ OverflowException: не удалось создать буфер из-за отсутствия запросов .

Я надеюсь, что следующее иллюстрирует поток, который япосле:

flux.groupBy(envelope -> envelope.partition)
  .flatMap(partitionFlux -> {
    final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
    final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
      .concatMap(batch -> {
        final ConsumedEnvelope last = batch.get(batch.size() - 1);

        return repository.persist(batch) // a)
          .then(last.acknowledge()) // b)
          .thenReturn(batch);
      });

    return processing(batchFlux);
  })
  .subscribe(result -> {
      // ...
  });

a) repository.persist внутренне ничего не делает, а выполняет итерацию пакета для создания операции вставки, а затем возвращает Mono<Void>.

b) ConsumedEnvelope.acknowledge ()для смещения Кафки, что я хочу сделать только после успешного сохранения пакета.Все это заключено в concatMap, чтобы обрабатывать только один пакет за раз для каждого раздела.

Как упоминалось выше, это приводит к исключению переполнения.Есть ли идиоматические способы достижения того, что я пытался описать?Мне кажется, что это не должно быть слишком необычной задачей, но я новичок в реакторе и хотел бы получить совет.

/ d

РЕДАКТИРОВАТЬ Я понял, что просто добавивonBackpressureBuffer на самом деле решает это ОК для меня.Но в целом, есть ли лучшие способы сделать это?

РЕДАКТИРОВАТЬ 2 ... выше, конечно, вызвало проблемы из-за несвязанного спроса, который я почему-то пропустил.Итак, вернемся к исходной проблеме или, возможно, к тому, чтобы onBackpressureBuffer НЕ запрашивал несвязанный запрос, а только перенаправлял то, что запрашивается из нисходящего потока.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...