Реактор проекта: бесконечный (бесконечный) поток - PullRequest
2 голосов
/ 08 ноября 2019

Я создал издателя с использованием метода Flux.create. Другой поток отправляет данные в FluxSink, данные преобразуются и отправляются в систему обмена сообщениями. Этот поток должен быть вокруг, пока приложение работает. Вот код:

public MyClass {
  private final FluxSink<Request> requestsSink;
  private final Flux<Request> flux;

  MyClass() {
    this.flux = Flux.<Request>create(sink -> this.requestsSink = sink)
            .bufferTimeout(props.getMaxSize(), props.getMaxTime())
            .flatMap(this::mergeRequests)
            .flatMap(
                req ->
                    remoteApi
                        .getData(req)
                        .onErrorResume(t -> Mono.empty()))
            .doOnNext(this::sendToTopic)
            .onErrorResume(
                t -> {
                  log.warn("error", t);
                  return Flux.empty();
                })
            .subscribe();
  }

  public void pushData(Request request) {
    requestsSink.next(request);
  }

  ... other methods
}

Итак, мне все равно, если где-то произойдет ошибка, я просто хочу зарегистрировать ее и продолжить обработку других записей.

По какой-то причине этопоток get утилизируется при неизвестных обстоятельствах (isDisposed возвращает true). Это происходит при высокой нагрузке.

  • Как предотвратить отмену / утилизацию / отделку флюса?
  • В каких условиях флюс можно отменить / утилизировать / завершить?
  • Как я могу отладить / зарегистрировать поток, чтобы понять причину отмены / удаления?
...