Проблема заключается в сбивающей с толку асимметрии в том, что Streams автоматически переносит явные значения serde во время работы с окнами, но не переносит автоматически значение serde по умолчанию. ИМХО, это упущение, которое нужно исправить, поэтому я подал: https://issues.apache.org/jira/browse/KAFKA-7806
Как уже отмечали другие, решение состоит в том, чтобы явно установить последовательность ключей в восходящем направлении, а не полагаться на стандартную серию ключей. Вы можете либо:
Установите serdes для оконной агрегации с помощью Materialized
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(как рекомендовал Нишу)
(обратите внимание, что не необходимо назвать операцией count
, побочный эффект которой делает ее запрашиваемой)
Или установить serdes дальше вверх по течению, например, на входе:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(как рекомендовано Вардзиниаком)
Выбор за вами; Я думаю, что в этом случае это не слишком сильно отличается в любом случае. Если бы вы выполняли агрегацию, отличную от count
, вы, вероятно, в любом случае задали бы значение serde с помощью Materialized
, поэтому, возможно, первый будет более унифицированным стилем.
Я также заметил, что в вашем определении окна не установлен льготный период. Время закрытия окна определяется как window end + grace period
, а по умолчанию оно составляет 24 часа, поэтому вы не увидите ничего, что выводится из подавления, до тех пор, пока через приложение не пройдут данные за 24 часа.
Для вашего тестирования я бы порекомендовал попробовать:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
В производственном процессе вы захотите выбрать льготный период, который уравновешивает величину запаздывания события, которое вы ожидаете в своем потоке, с величиной скорости передачи, которую вы хотите видеть из подавления.
И последнее замечание. Я заметил, что вы не изменили интервал кэширования или фиксации по умолчанию. В результате вы заметите, что сам оператор count
будет буферизовать обновления на 30 секунд по умолчанию, прежде чем передавать их на подавление. Это хорошая конфигурация для производства, поэтому вы не создаете узкое место на локальном диске или посреднике Kafka. Но это может вас удивить, пока вы тестируете.
Обычно для тестов (или для интерактивного тестирования) я отключаю кэширование и устанавливаю короткий интервал коммитов для максимального здравомыслия разработчика:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
Извините за оплошность в serde. Я надеюсь, что в скором времени мы получим адрес KAFKA-7806.
Надеюсь, это поможет!