У меня есть локальный кластер kubernetes, и я пытаюсь создать поток SCDF следующим образом:
file --file.directory=/home | aggregator --aggregator.group-timeout=10000 --aggregator.correlation=T(Thread).currentThread().id --spring.cloud.stream.bindings.output.contentType=application/octet-stream aggregator.aggregation=#root.![payload].toString() | log
Я пытаюсь добиться следующего: любой файл (любой # и любой тип) ), который помещается в папку /home
в модуле файлов, должен быть передан в агрегатор, который объединяет содержимое (байт []) файлов в один и этот контент должен быть выведен из системы в модуле журнала.
Поток развертывается просто отлично, но конвейер застревает в агрегаторе со следующей ошибкой:
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload=[[B@7eb1bcc6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=aha.file, amqp_deliveryTag=1, file_name=dasads, deliveryAttempt=1, amqp_consumerQueue=aha.file.aha, amqp_redelivered=false, file_originalFile=/home/dasads, file_relativePath=dasads, amqp_receivedRoutingKey=aha.file, amqp_timestamp=Wed Mar 04 02:09:09 GMT 2020, amqp_messageId=32196ebe-c66c-a3c6-c647-47d297385963, id=2b72fb4d-0259-62cc-d2d4-c9b06b9b4afe, amqp_consumerTag=amq.ctag-tTOyFxlowoCV9AE_Gmj0BQ, contentType=application/octet-stream, timestamp=1583287759048}]' to outbound message.
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:785) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:759) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:680) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:940) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processForceRelease(AbstractCorrelatingMessageHandler.java:597) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.lambda$scheduleGroupToForceComplete$3(AbstractCorrelatingMessageHandler.java:567) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_192]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_192]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_192]
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload=[[B@7eb1bcc6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=aha.file, amqp_deliveryTag=1, file_name=dasads, deliveryAttempt=1, amqp_consumerQueue=aha.file.aha, amqp_redelivered=false, file_originalFile=/home/dasads, file_relativePath=dasads, amqp_receivedRoutingKey=aha.file, amqp_timestamp=Wed Mar 04 02:09:09 GMT 2020, amqp_messageId=32196ebe-c66c-a3c6-c647-47d297385963, id=2b72fb4d-0259-62cc-d2d4-c9b06b9b4afe, amqp_consumerTag=amq.ctag-tTOyFxlowoCV9AE_Gmj0BQ, contentType=application/octet-stream, timestamp=1583287759048}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:389) ~[spring-cloud-stream-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:423) ~[spring-cloud-stream-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
... 24 common frames omitted
Я даже пытался явно указать тип вывода как application/octet-stream
в агрегаторе, чтобы соответствовать тому, что источник файла производит, но не повезло .. Что мне не хватает? Спасибо за любую информацию!
Версии зависимостей:
- SCDF: springcloud / spring-cloud-dataflow-server: 2.5.0.BUILD-SNAPSHOT
- Источник файла : docker: springcloudstream / file-source-rabbit: 2.1.1.RELEASE
- Процессор-агрегатор: docker: springcloudstream / агрегатор-процессор-кролик: 2.1.1.RELEASE