Ваше приложение Kafka Streams в порядке и работает правильно.
Ошибка находится в kafka-console-consumer
(kafka.tools.ConsoleConsumer
- это класс, который реализует логику для скрипта).
Он неправильно обрабатывает null
во время десериализации.Когда он получает null
в качестве значения или ключа для сообщения, он устанавливает значение по умолчанию (массив байтов, представляющих null
String).Если вы проверяете исходный код, вы можете найти следующую функцию
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
}
Как вы можете увидеть, когда он получает исходные данные, равные нулю (sourceBytes==null
) для десериализации, он установил для этого значение по умолчанию:
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
В вашем случае это "null".getBytes(StandardCharsets.UTF_8)
.Затем выполняется попытка десериализации с помощью org.apache.kafka.common.serialization.LongDeserializer
(ваше значение десериализатор).LongDeserializer
проверяет в самом начале размер массива байтов.Теперь оно равно 4 (байтовое представление null
), и генерируется исключение.
Если вы, например, используете StringDeserializer, он не будет десериализован должным образом, но, по крайней мере, не выдаст исключение, потому что онне проверяет длину массива байтов.
Короче говоря : модуль форматирования ConsoleConsumer, отвечающий за печать, для красивой печати задайте некоторое значение по умолчанию, которое не может быть обработанонекоторыми десериализаторами (LongDeserializer, IntegerDeserializer)
Относительно того, почему ваше приложение выдает значения null
для некоторых ключей:
KTable:filter
имеет другую семантику, чем KStream::filter
.Согласно javadoc для KTable:
для каждой удаляемой записи (т.е. не удовлетворяющей данному предикату) пересылается запись-захоронение.
Для вашего filter
, когда count <= 1
передает значение ключа null
.