При развертывании потока с помощью MongoDB Sink я получил исключение MappingException «Не удалось найти PersistentEntity для класса типа [B!» - PullRequest
0 голосов
/ 30 сентября 2018

Я сталкиваюсь с этой проблемой, когда пытаюсь использовать стартер приложения MongoDB Sink с информацией, считываемой источником JDBC:

MappingException: Couldn't find PersistentEntity for type class [B!

Класс [B!означает, что это byte[].Я напечатал в журнале, и это JSON объекта.Это ошибка?Другие приемники работают отлично.

Я использую Spring Cloud Data Flow 1.6.3 при локальном развертывании.Приложение запускается в Darwin.SR1 версии.

И это определение потока :

jdbc --password='mypass' --query='SELECT id, name FROM mytable WHERE imported = false' --max-rows-per-poll=1000 --update='UPDATE mytable SET imported = true WHERE id in (:id)' --fixed-delay=30 --time-unit=SECONDS --driver-class-name=org.postgresql.Driver --url=jdbc:postgresql://localhost:5432/mydatabase --username=postgres | mongodb --database=main --port=27017 --host=localhost --collection=mycollection --uri=mongodb://localhost/main

Full Stacktrace:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'SystemOutAndSink.systemOutProcessor.SystemOutAndSink.errors'; nested exception is org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:98) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.support.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:164) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    ... 8 common frames omitted
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder$3.handleMessage(RabbitMessageChannelBinder.java:513) ~[spring-cloud-stream-binder-rabbit-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 21 common frames omitted
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongoDbSinkMessageHandler]; nested exception is org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!, failedMessage=GenericMessage [payload=byte[26], headers={amqp_receivedDeliveryMode=PERSISTENT, sequenceNumber=1, amqp_receivedExchange=SystemOutAndSink.systemOutProcessor, amqp_deliveryTag=1, sequenceSize=1, deliveryAttempt=3, amqp_consumerQueue=SystemOutAndSink.systemOutProcessor.SystemOutAndSink, amqp_redelivered=false, amqp_receivedRoutingKey=SystemOutAndSink.systemOutProcessor, correlationId=33566e38-0f55-25be-40d6-5ef68ca496b2, id=e636d851-dc51-dbc3-99d6-3705ef71db59, amqp_consumerTag=amq.ctag-4DTfbrWGYdijPJZnyJzGpQ, contentType=application/json, timestamp=1538314765609}]
    ... 27 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongoDbSinkMessageHandler]; nested exception is org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:47) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
    ... 11 common frames omitted
Caused by: org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B!
    at org.springframework.data.mapping.context.MappingContext.getRequiredPersistentEntity(MappingContext.java:76) ~[spring-data-commons-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.writeInternal(MappingMongoConverter.java:435) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.write(MappingMongoConverter.java:391) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.write(MappingMongoConverter.java:86) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.toDocument(MongoTemplate.java:1070) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.doSave(MongoTemplate.java:1253) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1201) ~[spring-data-mongodb-2.0.8.RELEASE.jar!/:2.0.8.RELEASE]
    at org.springframework.integration.mongodb.outbound.MongoDbStoringMessageHandler.handleMessageInternal(MongoDbStoringMessageHandler.java:124) ~[spring-integration-mongodb-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 28 common frames omitted

1 Ответ

0 голосов
/ 01 октября 2018

Пока MongoTemplate.save() (по сути mappingContext.getPersistentEntity(type)) не поддерживает простое byte[] (или аналогичное ByteBuffer или просто InputStream), у нас нет выбора, если только не конвертировать входящие byte[] в String, чтобы заставить его работать.

Итак, для этой цели вам нужно написать простой jar с автоконфигурацией и следующим образом:String на MongoTemplate.

Не стесняйтесь ставить вопрос о MongoDB App Starter, чтобы включить эту функцию в коробку.

...