Как перейти на пользовательский кодировщик в Кафке производителя? - PullRequest
0 голосов
/ 15 мая 2018

Я получаю следующую ошибку при попытке использовать строку в качестве ключа в теме kafka.

18/05/14 17:08:26 ERROR async.DefaultEventHandler: Error serializing message for topic my_topic
java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
    at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
    at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Кажется, проблема в том, что кодировщик по умолчанию

public class DefaultEncoder implements Encoder<byte[]> 

делаетне поддерживается строка в байт

public byte[] toBytes(byte[] value) {
    return value;
}

Как правильно предоставить пользовательский кодировщик производителю?

И нужно ли также вносить изменения на стороне потребителя?

1 Ответ

0 голосов
/ 24 мая 2018

В Producer / Consumer вы можете указать другой сериализатор, а не ByteArraySerializer по умолчанию, Здесь вы можете найти доступные сериализаторы для версии 1.1.0, или вы можете указать свой собственный.

Как правило, если вы отправляете / получаете строки или Json, вы можете просто использовать значения по умолчанию org.apache.kafka.common.serialization.StringSerializer и org.apache.kafka.common.serialization.StringDeserializer.Для производителя / потребителя доступны следующие свойства:

private void configureProducer() {
    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}

private void configureConsumer() {
    Properties props = new Properties();
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
}

Properties producerProps = configureProducer();
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
Properties consumerProps = configureConsumer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

Если вы хотите отправить свой собственный компонент EJB, то либо преобразуйте его в json и используйте StringSerializer / Deserializer по умолчанию, либо создайте свои собственные классы сериализации / десериализации.Вы можете увидеть пример использования json-строк с пружинной загрузкой здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...