Поддержка составных функций с исключением выброса потокового облака: «Очередь заполнена: источник Reactive Streams не учитывает противодавление» - PullRequest
1 голос
/ 05 июня 2019

Мы создаем управляемые событиями микросервисы с использованием Spring Cloud Stream (2.1.2.RELEASE) с привязкой Rabbit и сталкиваемся с некоторыми проблемами, связанными с его интеграцией с функцией Spring Cloud (2.0.1.RELEASE).

Наш источник потока выглядит следующим образом:

  1. Считать файл CSV на SFTP-сервере и скопировать его на локальный
  2. Разделить содержимое локального файла по строке
  3. Преобразование строки CSV в пользовательский объект (с помощью функции)
  4. Публикация на выходном канале встроенного источника

Мы используем следующую (упрощенную) Java DSL для описания шагов 1 и 2:

@EnableBinding(Source.class)
public class SftpSource { 


@Bean
public IntegrationFlow flow() {
    SftpInboundChannelAdapterSpec messageSourceBuilder = ... ;
    return IntegrationFlows.from(messageSourceBuilder, c -> c.poller(...)
            .split(new FileSplitter(false));
            .channel(this.source.output())
            .get();
}
}

И функция для работы с шагом 3:

@Bean
public Function<String, MyPojo> myConverter(){
    return csvLine -> {
        try {
            // Convert csvLine to MyPojo here
            return myPojo;
        } catch (IOException e) {
            // Throw custom error 
            throw new CustomException(e);
        }
    };
}

В application.properties (или командной строке):

spring.cloud.stream.function.definition=myConverter

Он работает, как и ожидалось, когда файл содержит несколько строк (даже если есть некоторые ошибки преобразования, перенаправленные в канал глобальной ошибки), но как только файл содержит много строк (содержащих некоторые ошибки преобразования), следующее выдается ошибка:

2019-06-05 19:29:15.190  INFO 26096 --- [ask-scheduler-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'MyApp.output' has 0 subscriber(s).
2019-06-05 19:29:15.194 ERROR 26096 --- [ask-scheduler-1] 
reactor.Flux.OnAssembly.1                : | 
onError(reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure)
2019-06-05 19:29:15.221 ERROR 26096 --- [ask-scheduler-1] reactor.Flux.OnAssembly.1       
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:239) [reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:345) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
    at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:246) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_131]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
    reactor.core.publisher.Flux.concatMap(Flux.java:3445)
    org.springframework.cloud.stream.function.FunctionInvoker.apply(FunctionInvoker.java:128)
Error has been observed by the following operator(s):
    |_      Flux.concatMap ? org.springframework.cloud.stream.function.FunctionInvoker.apply(FunctionInvoker.java:128)

Мы потратили много времени на отладку, но безуспешно, любая помощь будет принята с благодарностью.

...