SerializationException: Размер данных, полученных IntegerDeserializer не 4 - PullRequest
0 голосов
/ 29 января 2019

Я получаю эту ошибку при выполнении kstream для получения агрегированного подсчета.

Exception in thread "KStreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

Вот код, который я выполняю

 final KStream<String, EventsAvro> stream = builder.stream("events_topic");

    KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));

    KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
    KTable<Windowed<Integer>, Long> windowedCount = groupedStream
                .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
                .count()
                .suppress(Suppressed.untilWindowCloses(unbounded()));

        windowedCount.toStream()
                .map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
                .to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));

Раньше он работал нормально, прежде чем я добавилэтот код подавления. Есть идеи?

1 Ответ

0 голосов
/ 02 февраля 2019

Я думаю, что это не проблема с serdes для count().

Если вы не передаете Materialized, используются serdes от объекта, для которого вы позвонили count().Эта цепочка ищущих сердов идет до метода, в котором вы прошли свои последние серды.В вашем случае это .groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long())).Serdes не проблема, потому что count() и suppress(...) будут использовать для ключа Serdes.Integer() и для значения Serdes.Long()).

Я попытался воспроизвести ваше исключение, и я смог это сделать,только когда я изменил тип ключей в сообщениях и Serdes, которые были обработаны функцией suppress (тип ключа группировки) и перезапустил приложение .Исключение выдается, когда KafkaStreams пытается сбросить данные во время фиксации.

Как я их воспроизвел:

Сначала создайте несколько сообщений производителем и выполните следующий код.Тип ключа важен (длинный)

final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Long, Long> events = stream.map((k, v) -> new KeyValue<Long, Long>((long) v.getPageId(), v.getUserId()));

KGroupedStream<Long, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Long(), Serdes.Long()));
KTable<Windowed<Long>, Long> windowedCount = groupedStream
            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded()));

    windowedCount.toStream()
            .map((key, value) -> new KeyValue<>(key.key().longValue(),value.longValue()))
            .to("test_topic",Produced.with(Serdes.Long(),Serdes.Long()));

Через 1-2 минуты остановите приложение и верните изменение в исходный код: тип ключа важен (целое число)

final KStream<String, EventsAvro> stream = builder.stream("events_topic");

KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));

KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded()));

    windowedCount.toStream()
            .map((key, value) -> new KeyValue<>(key.key().intValue(),value.longValue()))
            .to("test_topic",Produced.with(Serdes.Integer(),Serdes.Long()));

Создайте несколько сообщений, подождите 10 минут (зависит от вашего окна), создайте еще несколько сообщений и дождитесь выполнения коммита (30 секунд) - ваше исключение будет выдано.

Что идет не так?

Проблема в том, что в suppress(...) ключи старых сообщений были сериализованы с использованием старых серий.

suppress(...) Операция выполняется * 1041.*.Он имеет внутренний буфер, в котором хранятся сообщения перед их пересылкой (когда они истек ) на следующий ProcessorNode.Suppress нужна временная метка, поэтому его буфер в качестве ключа сообщения имеет композицию временной метки и массива байтов (после сериализации бизнес-ключа с business Serdes ) значение сообщения - только массив байтов (после сериализации бизнес-значения).

Подводя итог: буфер внутренне не заботится о типах бизнес-сообщений.Внутренний буфер материализуется в SUPPRESS changelog .

Если сообщение пересылается на следующий ProcessorNode, KTableSuppressProcessor:

  • удаляет сообщение из внутреннего буфера (во время очистки сообщенияс нулевым значением будет отправлено в SUPPRESS changelog ).
  • десериализовать сообщение (массив байтов) в значение бизнес-ключа nad и переслать следующему узлу.В вашем случае десериализация - это Integer и Long.<- Я думаю, что здесь было сгенерировано исключение </li>

Вопрос в том, почему исключение не было сгенерировано при запуске, но через некоторое время?

В первомиз приведенного выше фрагмента кода Long используется в качестве ключа в группировке.Когда сообщения передаются в suppress, suppress сериализует ключ как массив байтов и использует временную метку с этим массивом байтов в качестве ключа для своего внутреннего буфера.Когда приложение остановлено, внутренний буфер материализуется в тему SUPPRESS changelog .

Если мы изменим тип ключа группировки на Integer (второй фрагмент кода) и запустим приложение, основываясь на теме журнала изменений SUPPRESS, внутренниеБуфер будет восстановлен.Во время восстановления из необработанного ключа извлекается только временная метка.Массив байтов, представляющий бизнес-часть, не затрагивается.

Когда новые сообщения будут переданы в suppress, они будут обрабатываться так же, как и ранее (ключ будет сериализован в массив байтов и будет использоваться отметка временикак ключ внутреннего буфера).После обработки каждого сообщения KTableSuppressProcessor проверяет, истекла ли временная метка любых буферизованных сообщений, и если это произойдет, попробуйте переслать ее на следующий ProcessorNode.

В нашем примере в качестве ключей во внутреннем буфере у нас есть метка времени (long) и массивбайтов, представляющих бизнес-ключ (например, 8 байтов для Long и 4 байта для Integer).Поэтому перед отправкой KTableSuppressProcessor попытается десериализовать эти массивы (они имеют разную длину), используя IntegerDeserializer.Массив байтов, представляющих Long, будет слишком длинным , и IntegerDeserializer выдаст исключение.Эта операция происходит не при запуске приложения, а при выполнении фиксации.

Другой вопрос может быть следующим: почему исключение не выдается, если мы запускаем обе версии программы без: suppress.

KStreamWindowAggregate (отвечает за агрегацию) только проходитагрегированное сообщение, когда его значение было изменено.Поскольку мы меняем Serdes, мы не будем изменять старую агрегацию (ключ будет сериализован в другой массив байтов), а добавим новую.С другой стороны, KTableSuppressProcessor передает все просроченные сообщения, даже те, которые были буферизованы с более старыми Serdes.

...