Я думаю, что это не проблема с serdes для count()
.
Если вы не передаете Materialized
, используются serdes от объекта, для которого вы позвонили count()
.Эта цепочка ищущих сердов идет до метода, в котором вы прошли свои последние серды.В вашем случае это .groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()))
.Serdes не проблема, потому что count()
и suppress(...)
будут использовать для ключа Serdes.Integer()
и для значения Serdes.Long())
.
Я попытался воспроизвести ваше исключение, и я смог это сделать,только когда я изменил тип ключей в сообщениях и Serdes
, которые были обработаны функцией suppress
(тип ключа группировки) и перезапустил приложение .Исключение выдается, когда KafkaStreams пытается сбросить данные во время фиксации.
Как я их воспроизвел:
Сначала создайте несколько сообщений производителем и выполните следующий код.Тип ключа важен (длинный)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Long, Long> events = stream.map((k, v) -> new KeyValue<Long, Long>((long) v.getPageId(), v.getUserId()));
KGroupedStream<Long, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Long(), Serdes.Long()));
KTable<Windowed<Long>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().longValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Long(),Serdes.Long()));
Через 1-2 минуты остановите приложение и верните изменение в исходный код: тип ключа важен (целое число)
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
windowedCount.toStream()
.map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
.to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));
Создайте несколько сообщений, подождите 10 минут (зависит от вашего окна), создайте еще несколько сообщений и дождитесь выполнения коммита (30 секунд) - ваше исключение будет выдано.
Что идет не так?
Проблема в том, что в suppress(...)
ключи старых сообщений были сериализованы с использованием старых серий.
suppress(...)
Операция выполняется * 1041.*.Он имеет внутренний буфер, в котором хранятся сообщения перед их пересылкой (когда они истек ) на следующий ProcessorNode.Suppress
нужна временная метка, поэтому его буфер в качестве ключа сообщения имеет композицию временной метки и массива байтов (после сериализации бизнес-ключа с business Serdes ) значение сообщения - только массив байтов (после сериализации бизнес-значения).
Подводя итог: буфер внутренне не заботится о типах бизнес-сообщений.Внутренний буфер материализуется в SUPPRESS changelog .
Если сообщение пересылается на следующий ProcessorNode, KTableSuppressProcessor
:
- удаляет сообщение из внутреннего буфера (во время очистки сообщенияс нулевым значением будет отправлено в SUPPRESS changelog ).
- десериализовать сообщение (массив байтов) в значение бизнес-ключа nad и переслать следующему узлу.В вашем случае десериализация - это Integer и Long.<- Я думаю, что здесь было сгенерировано исключение </li>
Вопрос в том, почему исключение не было сгенерировано при запуске, но через некоторое время?
В первомиз приведенного выше фрагмента кода Long используется в качестве ключа в группировке.Когда сообщения передаются в suppress
, suppress
сериализует ключ как массив байтов и использует временную метку с этим массивом байтов в качестве ключа для своего внутреннего буфера.Когда приложение остановлено, внутренний буфер материализуется в тему SUPPRESS changelog .
Если мы изменим тип ключа группировки на Integer (второй фрагмент кода) и запустим приложение, основываясь на теме журнала изменений SUPPRESS, внутренниеБуфер будет восстановлен.Во время восстановления из необработанного ключа извлекается только временная метка.Массив байтов, представляющий бизнес-часть, не затрагивается.
Когда новые сообщения будут переданы в suppress
, они будут обрабатываться так же, как и ранее (ключ будет сериализован в массив байтов и будет использоваться отметка временикак ключ внутреннего буфера).После обработки каждого сообщения KTableSuppressProcessor
проверяет, истекла ли временная метка любых буферизованных сообщений, и если это произойдет, попробуйте переслать ее на следующий ProcessorNode.
В нашем примере в качестве ключей во внутреннем буфере у нас есть метка времени (long) и массивбайтов, представляющих бизнес-ключ (например, 8 байтов для Long и 4 байта для Integer).Поэтому перед отправкой KTableSuppressProcessor
попытается десериализовать эти массивы (они имеют разную длину), используя IntegerDeserializer
.Массив байтов, представляющих Long, будет слишком длинным , и IntegerDeserializer выдаст исключение.Эта операция происходит не при запуске приложения, а при выполнении фиксации.
Другой вопрос может быть следующим: почему исключение не выдается, если мы запускаем обе версии программы без: suppress
.
KStreamWindowAggregate
(отвечает за агрегацию) только проходитагрегированное сообщение, когда его значение было изменено.Поскольку мы меняем Serdes, мы не будем изменять старую агрегацию (ключ будет сериализован в другой массив байтов), а добавим новую.С другой стороны, KTableSuppressProcessor
передает все просроченные сообщения, даже те, которые были буферизованы с более старыми Serdes.