Как получить доступ к заголовку сообщения Kafka при расширении http-приложения для начинающих - PullRequest
0 голосов
/ 17 апреля 2020

Я пытаюсь создать собственный http-преобразователь на основе образца здесь . И исходный код связующего Кафки здесь

Я использую Кафку в качестве связующего в SCDF. Сценарий, когда http вызывается, я анализирую сообщение в функции и помещаю сообщение AVRO в kafka.

Пример приложения работает нормально, когда определена функция, как показано ниже. Приложение отлично работает, конвертирует и помещает авро сообщение в kafka topi c и работает нормально, как и должно.

@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class DemoSourceTransformer {

    @Bean
    public Function<String, Event> transform() {
        return value -> {
            //transform message here
            return event;
        };
    }
}

Требуется наличие дополнительного заголовка для дополнительной последующей обработки на основе заголовка. Я не могу получить доступ к заголовку в версии загрузки, которую я имею. 2.1.12.

Изменение, которое я сделал на основе некоторых других примеров из документации по облачному потоку, заключается в определении функции таким образом

  @Bean
    public Function<Message<String>, Message<Event>> transform() {
        return value -> {
            //transform message here
            return MessageBuilder.withPayload(event)
                    .setHeader("custom_key", "custom_value")
                    .build();
        };
    }

Но на этот раз сообщение не передается в kafka topi c и вместо этого увидеть следующую ошибку.

java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
    at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:76) ~[kafka-avro-serializer-5.3.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) ~[kafka-avro-serializer-5.3.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:489) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242) ~[spring-kafka-2.2.12.RELEASE.jar:2.2.12.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:146) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1925) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1799) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7957) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:86) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:92) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7957) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268) ~[reactor-core-3.2.14.RELEASE.jar:3.2.14.RELEASE]
    at org.springframework.integration.channel.MessageChannelReactiveUtils.lambda$null$0(MessageChannelReactiveUtils.java:67) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:413) ~[spring-integration-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingEndpointSupport.actualDoHandleRequest(HttpRequestHandlingEndpointSupport.java:374) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingEndpointSupport.doHandleRequest(HttpRequestHandlingEndpointSupport.java:253) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.integration.http.inbound.HttpRequestHandlingMessagingGateway.handleRequest(HttpRequestHandlingMessagingGateway.java:112) ~[spring-integration-http-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter.handle(HttpRequestHandlerAdapter.java:53) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:88) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:209) ~[spring-security-web-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178) ~[spring-security-web-5.1.7.RELEASE.jar:5.1.7.RELEASE]
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:94) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:114) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:104) ~[spring-boot-actuator-2.1.12.RELEASE.jar:2.1.12.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.1.13.RELEASE.jar:5.1.13.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:367) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:860) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1598) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_221]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_221]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.30.jar:9.0.30]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]

В настоящее время я использую kafka avro serializer версии 5.3.1. Есть ли какая-либо другая версия, которую мне нужно использовать, или другой способ убедиться, что я могу добавить заголовки к сообщению в topi c в дополнение к полезной нагрузке avro.

1 Ответ

0 голосов
/ 30 апреля 2020

Не ясно, что такое Message или Event, но ошибка говорит о том, что это явно не тип Avro.

Avro требуется схема. Где ваша схема?

Например, вы должны использовать GenericRecord или создать значения, которые вы отправляете, используя Плагин Avro Maven

Как получить доступ к Kafka Заголовок сообщения

Это зависит от того, предоставит ли Spring эту информацию. В клиенте basi c Kafka Java, который является частью ConsumerRecord

...