KafkaStreams: получение оконных окончательных результатов - PullRequest
0 голосов
/ 09 января 2019

Можно ли получить оконный конечный результат в потоках Кафки, подавив промежуточные результаты.

Я не могу достичь этой цели. Что не так с моим кодом?

    val builder = StreamsBuilder()
    builder.stream<String,Double>(inputTopic)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
            .toStream()
            .print(Printed.toSysOut())

Это приводит к этой ошибке:

Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

Код / Детали ошибки: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

Ответы [ 3 ]

0 голосов
/ 09 января 2019

Проблема с KeySerde. Поскольку операция WindowedBy приводит к вводу ключа типа Windowed<String>, а .suppress() использует тип ключа по умолчанию.

Следовательно, вам нужно определить KeySerde в хранилище состояний при вызове метода подсчета, как указано ниже:

      builder.stream<String,Double>inputTopic)
      .groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
      .count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
      .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
      .toStream()
      . print(Printed.toSysOut());
0 голосов
/ 09 января 2019

Проблема заключается в сбивающей с толку асимметрии в том, что Streams автоматически переносит явные значения serde во время работы с окнами, но не переносит автоматически значение serde по умолчанию. ИМХО, это упущение, которое нужно исправить, поэтому я подал: https://issues.apache.org/jira/browse/KAFKA-7806

Как уже отмечали другие, решение состоит в том, чтобы явно установить последовательность ключей в восходящем направлении, а не полагаться на стандартную серию ключей. Вы можете либо:

Установите serdes для оконной агрегации с помощью Materialized

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count(Materialized.with(Serdes.String(), Serdes.Long()))
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())

(как рекомендовал Нишу)

(обратите внимание, что не необходимо назвать операцией count, побочный эффект которой делает ее запрашиваемой)

Или установить serdes дальше вверх по течению, например, на входе:

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())

(как рекомендовано Вардзиниаком)

Выбор за вами; Я думаю, что в этом случае это не слишком сильно отличается в любом случае. Если бы вы выполняли агрегацию, отличную от count, вы, вероятно, в любом случае задали бы значение serde с помощью Materialized, поэтому, возможно, первый будет более унифицированным стилем.

Я также заметил, что в вашем определении окна не установлен льготный период. Время закрытия окна определяется как window end + grace period, а по умолчанию оно составляет 24 часа, поэтому вы не увидите ничего, что выводится из подавления, до тех пор, пока через приложение не пройдут данные за 24 часа.

Для вашего тестирования я бы порекомендовал попробовать:

.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))

В производственном процессе вы захотите выбрать льготный период, который уравновешивает величину запаздывания события, которое вы ожидаете в своем потоке, с величиной скорости передачи, которую вы хотите видеть из подавления.

И последнее замечание. Я заметил, что вы не изменили интервал кэширования или фиксации по умолчанию. В результате вы заметите, что сам оператор count будет буферизовать обновления на 30 секунд по умолчанию, прежде чем передавать их на подавление. Это хорошая конфигурация для производства, поэтому вы не создаете узкое место на локальном диске или посреднике Kafka. Но это может вас удивить, пока вы тестируете.

Обычно для тестов (или для интерактивного тестирования) я отключаю кэширование и устанавливаю короткий интервал коммитов для максимального здравомыслия разработчика:

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

Извините за оплошность в serde. Я надеюсь, что в скором времени мы получим адрес KAFKA-7806.

Надеюсь, это поможет!

0 голосов
/ 09 января 2019

Добавить Consumed, когда вы создаете поток: builder.stream<String,Double>(inputTopic, Consumed. с (Serdes.String(), Serdes.Double())

...