Ошибка потоков Кафки: SerializationException: Размер данных, полученных LongDeserializer, не равен 8 - PullRequest
0 голосов
/ 28 февраля 2019

Я пробую Кафку Стримс.Написание простого приложения, в котором я считаю дубликаты сообщений.

Сообщение:

2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2491
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2492

и т. Д.

Я пытаюсь разбить такие сообщения на session:prod-xxxx.Используйте это как ключ.И session:prod-xxxx+Hello World: xxxx используйте это как значение.Затем сгруппируйте по ключу и посмотрите, какие сообщения дублировались в каждом сеансе.

Вот код:

KStream<String, String> textLines = builder.stream("RegularProducer");
KTable<String, Long> ktable = textLines.map(
    (String key, String value) -> {
        try {
            String[] parts = value.split("::");
            String sessionId = parts[1];
            String message = ((parts[2]).split("=>"))[1];
            message = sessionId+":"+message;
            return new KeyValue<String,String>(sessionId.trim().toLowerCase(), message.trim().toLowerCase());
        } catch (Exception e) {
            return new KeyValue<String,String>("Invalid-Message".trim().toLowerCase(), "Invalid Message".trim().toLowerCase());
        }
    })
    .groupBy((key,value) -> value)
    .count().filter(
            (String key, Long value) -> {
                return value > 1;
            }
    );

ktable.toStream().to("RegularProducerDuplicates", 
Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
topology.describe();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

Создается тема KTable RegularProducerDuplicates.Но когда я использую консоль-потребителя для просмотра, он вылетает с ошибкой.Затем я использую флаг --skip-message-on-error на консоли-потребителе.Теперь я вижу тысячи таких строк

session:prod-111656 : hello world: 994  [2019-02-28 16:25:18,081] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

Кто-нибудь может мне помочь, что здесь не так?

1 Ответ

0 голосов
/ 28 февраля 2019

Ваше приложение Kafka Streams в порядке и работает правильно.

Ошибка находится в kafka-console-consumer (kafka.tools.ConsoleConsumer - это класс, который реализует логику для скрипта).

Он неправильно обрабатывает null во время десериализации.Когда он получает null в качестве значения или ключа для сообщения, он устанавливает значение по умолчанию (массив байтов, представляющих null String).Если вы проверяете исходный код, вы можете найти следующую функцию

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
  val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
  val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
    getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
  output.write(convertedBytes)
}

Как вы можете увидеть, когда он получает исходные данные, равные нулю (sourceBytes==null) для десериализации, он установил для этого значение по умолчанию:

val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))

В вашем случае это "null".getBytes(StandardCharsets.UTF_8).Затем выполняется попытка десериализации с помощью org.apache.kafka.common.serialization.LongDeserializer (ваше значение десериализатор).LongDeserializer проверяет в самом начале размер массива байтов.Теперь оно равно 4 (байтовое представление null), и генерируется исключение.

Если вы, например, используете StringDeserializer, он не будет десериализован должным образом, но, по крайней мере, не выдаст исключение, потому что онне проверяет длину массива байтов.

Короче говоря : модуль форматирования ConsoleConsumer, отвечающий за печать, для красивой печати задайте некоторое значение по умолчанию, которое не может быть обработанонекоторыми десериализаторами (LongDeserializer, IntegerDeserializer)

Относительно того, почему ваше приложение выдает значения null для некоторых ключей:

KTable:filter имеет другую семантику, чем KStream::filter.Согласно javadoc для KTable:

для каждой удаляемой записи (т.е. не удовлетворяющей данному предикату) пересылается запись-захоронение.

Для вашего filter, когда count <= 1 передает значение ключа null.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...