Преобразование будущего Scala в поток реакторов - PullRequest
0 голосов
/ 01 октября 2019

Я вызываю API, который возвращает Future of Scala, и я хочу преобразовать в Reactor Flux, очевидно, без блокировки и ожидаю ответа в будущем. Я пытаюсь использовать EmitterProcessor, и он прекрасно работает, когда будущее успешное, но, к сожалению, это не так, когда в будущем есть Тайм-аут.

Здесь код

private Flux<D> transformFutureToFlux(Future<T> future) {
    EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
    future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
    return Flux.from(emitterProcessor);
}

private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
    return new OnComplete() {
        @Override
        public void onComplete(Throwable t, Object result) {
            if (t != null) {
                processError(t);
            } else {
                processSucceed(result);
            }
        }

        private void processSucceed(Object result) {
            if (result instanceof ConnectorErrorVO) {
                publishConnectorErrorException((ConnectorErrorVO) result);
            } else {
                publishGenericResponse(result);
            }
        }

        private void publishGenericResponse(Object result) {
            if (result instanceof List<?>) {
                Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
            } else {
                Flux.just((D) result).subscribe(emitterProcessor);
            }
        }

        private void publishConnectorErrorException(ConnectorErrorVO result) {
            ConnectorErrorVO connectorErrorVO = result;
            Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
            error.subscribe(emitterProcessor);
        }

        private void processError(Throwable t) {
            ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
            if (t instanceof AskTimeoutException) {
                Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
            } else {
                Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
            }
        }
    };

То, что я пытаюсь сделать, правильно? Есть ли лучший способ сделать это?

С уважением

Ответы [ 2 ]

1 голос
/ 11 октября 2019

Если вы возвращаете Future, это означает, что возвращается только 1, а не серия / поток возвращаемого значения. Вы можете конвертировать в Flux, если хотите, но почему бы не использовать вместо него Mono?

Кроме того, если вы используете scala, попробуйте использовать processor-scala-extensions *1007* SMono.fromFuture

Проверьте SMonoTest для примера.

    import scala.concurrent.ExecutionContext.Implicits.global
    StepVerifier.create(SMono.fromFuture(Future[Long] {
      randomValue
    }))
      .expectNext(randomValue)
      .verifyComplete()
0 голосов
/ 03 октября 2019

Вы пробовали Flux.create()? Вместо использования процессора вы регистрируете будущий обратный вызов, который вызывает предоставленные Sink методы. Например. для случая List вы должны выполнить итерацию по списку и передать каждое значение методом sink.next(T).

...