Мы создаем управляемые событиями микросервисы с использованием Spring Cloud Stream (2.1.2.RELEASE) с привязкой Rabbit и сталкиваемся с некоторыми проблемами, связанными с его интеграцией с функцией Spring Cloud (2.0.1.RELEASE).
Наш источник потока выглядит следующим образом:
- Считать файл CSV на SFTP-сервере и скопировать его на локальный
- Разделить содержимое локального файла по строке
- Преобразование строки CSV в пользовательский объект (с помощью функции)
- Публикация на выходном канале встроенного источника
Мы используем следующую (упрощенную) Java DSL для описания шагов 1 и 2:
@EnableBinding(Source.class)
public class SftpSource {
@Bean
public IntegrationFlow flow() {
SftpInboundChannelAdapterSpec messageSourceBuilder = ... ;
return IntegrationFlows.from(messageSourceBuilder, c -> c.poller(...)
.split(new FileSplitter(false));
.channel(this.source.output())
.get();
}
}
И функция для работы с шагом 3:
@Bean
public Function<String, MyPojo> myConverter(){
return csvLine -> {
try {
// Convert csvLine to MyPojo here
return myPojo;
} catch (IOException e) {
// Throw custom error
throw new CustomException(e);
}
};
}
В application.properties (или командной строке):
spring.cloud.stream.function.definition=myConverter
Он работает, как и ожидалось, когда файл содержит несколько строк (даже если есть некоторые ошибки преобразования, перенаправленные в канал глобальной ошибки), но как только файл содержит много строк (содержащих некоторые ошибки преобразования), следующее выдается ошибка:
2019-06-05 19:29:15.190 INFO 26096 --- [ask-scheduler-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'MyApp.output' has 0 subscriber(s).
2019-06-05 19:29:15.194 ERROR 26096 --- [ask-scheduler-1]
reactor.Flux.OnAssembly.1 : |
onError(reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure)
2019-06-05 19:29:15.221 ERROR 26096 --- [ask-scheduler-1] reactor.Flux.OnAssembly.1
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:239) [reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:345) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:246) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) ~[spring-context-5.1.7.RELEASE.jar:5.1.7.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
reactor.core.publisher.Flux.concatMap(Flux.java:3445)
org.springframework.cloud.stream.function.FunctionInvoker.apply(FunctionInvoker.java:128)
Error has been observed by the following operator(s):
|_ Flux.concatMap ? org.springframework.cloud.stream.function.FunctionInvoker.apply(FunctionInvoker.java:128)
Мы потратили много времени на отладку, но безуспешно, любая помощь будет принята с благодарностью.