Как работать с JSON в теме Кафки? - PullRequest
0 голосов
/ 06 июня 2018

Я создаю Java-приложение, в котором есть несколько JSON объектов (в частности, объектов AnyJson из com.satori.rtm.model.AnyJson), и я хочу отправитьэти объекты в теме Kafka .Должен ли я отправить их в AnyJson o String типа?Я спрашиваю об этом, потому что конструктор KafkaProducer<K, V>, кажется, имеет некоторые проблемы для обработки значений JSON при (де) сериализации.

Это в моей конфигурации производителя

Properties props= new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerEndpoint);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
kafkaProducer= new KafkaProducer<Integer, AnyJson>(props);

, где JsonSerializer.class из org.springframework.kafka.support.serializer.JsonSerializer; Я видел несколько пакетов, которые обрабатывают объекты JSON (как kafka.utils.json, com.google.gson.JsonObject ; и т. Д.).Затем при запуске

ProducerRecord<Integer, AnyJson> record= new ProducerRecord<Integer, AnyJson>(topic, json);
kafkaProducer.send(record);

у меня следующее исключение Не найден сериализатор для класса com.satori.rtm.connection.GsonSerializer $ JsonElementWrapper

Любая помощь?

1 Ответ

0 голосов
/ 06 июня 2018

Возможно, вы захотите использовать org.apache.kafka.common.serialization.ByteArraySerializer для сериализации ключей и значений.Теперь вы должны сконфигурировать вашу запись производителя с помощью byte [].

Затем используйте ObjectMapper от Джексона (http://www.baeldung.com/jackson-object-mapper-tutorial) для преобразования любого объекта json в байтовый массив.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...