Миграция SCDF Spring Cloud Stream с 2.1.4 на загрузку 2.2.4 прерывает выпуск kafka avro stream - PullRequest
0 голосов
/ 06 апреля 2020

У нас есть процессоры Spring Cloud, использующие методологию Spring cloud Functions. текущая используемая загрузочная версия - 2.1.4 и облачная версия Greenwich.SR1

Ниже скелет процессора

@EnableBinding(Processor.class)
public class FilterProcessor {

    @Bean
    public Function<DeviceEvent, DeviceEvent> filter() {
        return deviceEvent -> {
            // process and return data
        };
    }
}

Ниже приведена конфигурация yml приложения

spring:
  cloud:
    stream:
      default:
        content-type: application/*+avro
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      function:
        definition: filter
      kafka:
        binder:
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://schemaregistry:8081
          consumer-properties:
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: http://schemaregistry:8081

Но при переходе с 2.1.4 на 2.2.4 / Hoxton.SR1 внесены изменения в yml, как указано ниже для другого билета

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

Все остальные сведения о приложении и конфигурации остаются прежними. Также удалена аннотация @EnableBinding.

Но при отправке сообщения на мой процессор в потоке я получаю следующее исключение


2020-04-05 23:12:45.370 ERROR 462 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = FTV, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1586108023129, serialized key size = -1, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomJavaObject)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused), failedMessage=GenericMessage [payload={"providerCode": "GTB", "customerId": "123", "type": "ASSET", "emitTime": 1585101533015, "captureTime": 1585101533015, "readTime": 1585101533015, "deviceId": "DeviceId", "data": {"battery_voltage": "13", "fmi": "4", "battery_voltage2": "21", "battery_voltage3": "13"}}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@337330a0, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=FTV, kafka_receivedTimestamp=1586108023129, kafka_groupId=fls}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1745) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1734) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]

Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:751) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:677) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:452) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:69) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:308) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_192]
        at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_192]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) ~[na:1.8.0_192]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.apply(BeanFactoryAwareFunctionRegistry.java:465) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:602) ~[spring-cloud-stream-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_192]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_192]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
        at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:386) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:636) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:629) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:613) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        ... 34 common frames omitted

Обновлен Stacktrace of error

Любые указатели на сделать так, чтобы процессор при последней загрузке помог бы, так как наши пользовательские библиотеки обновляются и используют новейшие функции, процессоры, находящиеся на старой загрузке, не позволяют некоторым функциям работать правильно или не загружаются, например, когда configproperties использует внутренний класс или использует конструктор связывание.

Так что любые указатели для разрешения будут высоко оценены. Напомним, что в коде нет никаких HTTP-вызовов, процессор - это просто входящий процессор с только оператором log.

Определение потока, как показано ниже

:TopicName > Processor | Sink

И процессор, и приемник содержат только операторы журнала. Процессор просто возвращает полученное событие.

1 Ответ

0 голосов
/ 07 апреля 2020

Просто, чтобы подвести итог для тех, кто может столкнуться с подобной проблемой. При переходе от более низкой версии к более высокой моя становится от 2.1.4 (Greenwich.SR1) до 2.2.4 (Hoxton.SR1)

Добавьте привязки функций, как показано ниже, при этом привязки будут <functionName>-<in for input, out for output>-<param sequence number>

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

Добавление конфигурации клиента реестра реестра. Кажется, что @EnableSchemaRegistry не работает

   @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}") String endPoint) {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endPoint);
        return client;
    }
...