Spring Cloud Stream & Kafka: ошибка сериализации для производителя Kafka - PullRequest
0 голосов
/ 26 октября 2019

Я новичок в Spring Cloud Stream и Kafka. Я получаю следующую ошибку от производителя kafka при отправке строки в полезной нагрузке. Любая помощь или идеи очень ценится. Я попытался использовать сериализатор / десериализатор bytearray, а также json, в отличие от простого текста.

сообщение об ошибке: org.apache.kafka.common.errors.SerializationException: Невозможно преобразовать значение класса [B в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer

стек ошибок:

2019-10-25 16:13:40.762 ERROR 4628 --- [  XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits   : Internal Server Error

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		....
		
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)

Здесь перечислены настройки Spring Cloud Stream

  cloud:
    stream:
      bindings:
          greetings-in:
              destination: greetings
              #content-type: application/json
              content-type: text/plain
          greetings-out:
              destination: greetings
              #content-type: application/json
              content-type: text/plain

Настройки производителя

2019-10-25 16:12:28.346  INFO 4628 --- [  restartedMain] com.ll.kafkaservice.KafkaServiceApp      : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [  restartedMain] c.l.k.aop.logging.LoggingAspect          : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377  INFO 4628 --- [  restartedMain] c.l.k.service.KafkaServiceKafkaProducer  : Kafka producer initializing...
2019-10-25 16:12:28.378  INFO 4628 --- [  restartedMain] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-10-25 16:12:28.399  INFO 4628 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0

Здесь указан объект, содержащий сообщение

package com.ll.kafkaservice.messaging;

import java.io.Serializable;

public class Greeting  implements Serializable {
	private static final long serialVersionUID = 1L;
	
	private String message;

	
	public Greeting() {
	}

	
	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
	public String toString() {
		StringBuffer sbuffer = new StringBuffer();
		
		sbuffer.append("{");
		sbuffer.append("message:");
		sbuffer.append(message);
		sbuffer.append("}");
		
		return sbuffer.toString();
	}
}

Определение потоков

package com.ll.kafkaservice.greeting;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

Связать потоки

package com.ll.kafkaservice.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;


@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {

}

Создание / отправка сообщения

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import com.ll.kafkaservice.messaging.Greeting;


@Service
public class GreetingsService {
    private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
    
    private final GreetingsStreams greetingsStreams;
    
    private MessageChannel messageChannel;
    
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greeting greeting) {
        messageChannel = greetingsStreams.outboundGreetings();
        log.info("Before send {}", greeting.toString());
        messageChannel.send(MessageBuilder
        		// Sends a string to payload not the object
                .withPayload(greeting.getMessage())
                // Note:  tried this with and without the header
                //.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build());
    }
}

Потребление / получение сообщения

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.ll.kafkaservice.messaging.Greeting;



@Component
public class GreetingsListener {
    private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);

    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greeting greetings) {
        log.info("Received greetings: {}", greetings.getMessage());
    }
    
    //@StreamListener(GreetingsStreams.INPUT)
    //public void handleGreetings(String greetings) {
    //    log.info("Received greetings: {}", greetings);
    //}
}

Ответы [ 2 ]

0 голосов
/ 08 ноября 2019

Вам также необходимо опубликовать pom и файл свойств приложения. Я держу пари, что в приложении могут быть такие строки:

kafka:
consumer:
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  bootstrap.servers: localhost:9092
  group.id: fixed-asset-service
  auto.offset.reset: earliest
producer:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  bootstrap.servers: localhost:9092

Обычно это происходит потому, что блоги и учебные пособия, как правило, показывают, как отправить строку. Таким образом, сразу же после того, как вы попробуете какой-нибудь пользовательский объект, он почти наверняка потерпит неудачу. С spring-cloud-stream не нужно указывать десериализаторы, как это делается автоматически. Это одна из причин, по которой можно использовать такую ​​среду, как spring-cloud-stream. И если вы найдете такую ​​строку в свойствах приложения, можете ли вы заменить ее на следующую:

key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Она должна работать. Еще лучше остаться в строках и сохранить конфигурацию spring.cloud.streams

0 голосов
/ 26 октября 2019

По умолчанию платформа SCSt преобразует полезную нагрузку в byte[] и использует ByteArraySerializers.

Поскольку вы настроили привязку для использования пользовательских сериализаторов, вы должны установить useNativeEncoding в true. См. Свойства источника .

useNativeEncoding

Если установлено значение true, исходящее сообщение сериализуется напрямую клиентской библиотекой, которая должнабыть настроен соответствующим образом (например, установить соответствующий сериализатор значений производителя Kafka). Когда эта конфигурация используется, сортировка исходящих сообщений не основана на contentType привязки. Когда используется собственное кодирование, потребитель обязан использовать соответствующий декодер (например, десериализатор потребительских значений Kafka) для десериализации входящего сообщения. Кроме того, когда используется собственное кодирование и декодирование, свойство headerMode = embeddedHeaders игнорируется, а заголовки не включаются в сообщение. См. Потребительское свойство useNativeDecoding.

Однако вам нужно будет использовать JsonSerializer, а не просто сериализатор String, если вы хотите отправить POJO.

По какой-то причине выне полагаясь на рамки, чтобы сделать преобразование для вас?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...