Spring Cloud Stream Kafka с реестром схемы слияния не смог отправить сообщение на выход канала - PullRequest
0 голосов
/ 25 апреля 2019

Я хотел бы протестировать Spring Cloud Stream с регистрацией Confluent Schema и эволюцией Avro Schema, чтобы интегрировать ее с моим приложением. Я обнаружил, что Spring Cloud Stream не поддерживает безопасное соединение с реестром Confluent Schema, и реализация по-прежнему очень проста. Поэтому я решил использовать клиент реестра Confluent Schema вместе с Spring Kafka для части реестра схемы и использовать Spring Cloud Stream для остальных. Вот код для Потребителя и Производителя, а также Схема Avro:

Потребитель:

import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {

    private final Log logger = LogFactory.getLog(getClass());

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void process(Sensor data) {
        logger.info(data);
    }

    @Configuration
    static class ConfluentSchemaRegistryConfiguration {
        @Bean
        public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
            CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
            return client;
        }
    }
}

Application.yaml:

spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: sensor-topic
      schemaRegistryClient:
        endpoint: https://user:password@schema-registry-url:schema-registry-port
      schema:
        avro:
          schema-locations: classpath:avro/sensor.avsc
      kafka:
        binder:
          brokers: SSL://kafka-url:kafka-port
          configuration:
            security.protocol: SSL
            ssl.truststore.type: JKS
            ssl.truststore.location: client.truststore.jks
            ssl.truststore.password: secret
            ssl.keystore.type: PKCS12
            ssl.keystore.location: client.keystore.p12
            ssl.keystore.password: secret
            ssl.key.password: secret
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: user:password
server.port: 9999

Производитель:

import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.UUID;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class Producer1Application {

    @Autowired
    private Source source;

    private Random random = new Random();

    public static void main(String[] args) {
        SpringApplication.run(Producer1Application.class, args);
    }

    @RequestMapping(value = "/messages", method = RequestMethod.POST)
    public String sendMessage() {
        source.output().send(MessageBuilder.withPayload(randomSensor()).build());
        return "ok, have fun with v1 payload!";
    }

    private Sensor randomSensor() {
        Sensor sensor = new Sensor();
        sensor.setId(UUID.randomUUID().toString() + "-v1");
        sensor.setAcceleration(random.nextFloat() * 10);
        sensor.setVelocity(random.nextFloat() * 100);
        sensor.setTemperature(random.nextFloat() * 50);
        return sensor;
    }

    //Another convenience POST method for testing deterministic values
    @RequestMapping(value = "/messagesX", method = RequestMethod.POST)
    public String sendMessageX(@RequestParam(value="id") String id, @RequestParam(value="acceleration") float acceleration,
                               @RequestParam(value="velocity") float velocity, @RequestParam(value="temperature") float temperature) {
        Sensor sensor = new Sensor();
        sensor.setId(id + "-v1");
        sensor.setAcceleration(acceleration);
        sensor.setVelocity(velocity);
        sensor.setTemperature(temperature);
        source.output().send(MessageBuilder.withPayload(sensor).build());
        return "ok, have fun with v1 payload!";
    }

    @Configuration
    static class ConfluentSchemaRegistryConfiguration {
        @Bean
        public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
            CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
            return client;
        }
    }

}

Application.yaml

spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
  cloud:
    stream:
      bindings:
        output:
          contentType: application/*+avro
          destination: sensor-topic
      schemaRegistryClient:
        endpoint: https://user:password@schema-registry-url:schema-registry-port
      schema:
        avro:
          schema-locations: classpath:avro/sensor.avsc
      kafka:
        binder:
          brokers: SSL://kafka-url:kafka-port
          configuration:
            security.protocol: SSL
            ssl.truststore.type: JKS
            ssl.truststore.location: client.truststore.jks
            ssl.truststore.password: secret
            ssl.keystore.type: PKCS12
            ssl.keystore.location: client.keystore.p12
            ssl.keystore.password: secret
            ssl.key.password: secret
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: user:password
server.port: 9009

Схема потребителя:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
    {"name":"externalTemperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}

Схема производителя:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"temperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}

Я застрял со следующим исключением. Буду признателен, если вы поможете мне понять, в чем проблема.

java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"id": "a50a7646-a1c9-49a2-a7f6-f09b77fc3116-v1", "temperature": 10.430967, "acceleration": 5.434994, "velocity": 70.19337}, headers={id=de6e40e4-ba8f-9dca-1753-c06a1751e2d4, contentType=application/*+avro, timestamp=1556160207806}]' to outbound message.
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:325) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:353) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at sample.producer1.Producer1Application.sendMessage(Producer1Application.java:39) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:90) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_191]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_191]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]

P.S .: Я использую Confluent Client 5.2.1, Avro 1.8.2, Spring Boot 2.1.4.RELEASE и Spring Cloud Stream Fishtown.RELEASE.

1 Ответ

1 голос
/ 25 апреля 2019

Я полагаю, вы используете версию Spring Cloud Stream 2.0+.Если это так, возможно, вы столкнулись с проблемой согласования типа контента.В сообщении об ошибке заголовок типа содержимого в GenericMessage равен application/*+avro.Тип содержимого по умолчанию начиная с версии 2.0: application/json.

. Вам необходимо добавить org.springframework.messaging.converter.MessageConverter для обработки содержимого Avro, которое отсутствует в конфигурации.См .: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-preface-content-type-negotiation-improvements

- Обновление -

Конвертер уже существует, его просто нужно явно включить для исходящих сообщений, это можно сделать через конфигурацию:

spring.cloud.stream.bindings.output.contentType=application/*+avro

Код пользователя выглядит нормально, с SchemaRegistryClient.

Документы: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_avro_schema_registry_client_message_converters

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...