Как работает оконное управление Kafka Streams? - PullRequest
0 голосов
/ 31 мая 2018

Мне трудно понять, как работает Windowing в Kafka Streams.Результаты не соответствуют тому, что я прочитал и понял до сих пор.

Я создал поток KSQL с вспомогательной темой.один из 'столбцов' в операторе KSQL SELECT был определен как TIMESTAMP для темы.

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

Записи в my-stream-topic группируются по ключу (PARTITION_KEY) и отображаются в окне с переключениемwindow

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

Записи агрегируются через

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

Затем я печатаю на консоль через

dataAgg.print()
topicStats.print()

Первое в группе окно переводится в 7:00- 7: 05

Когда я проверяю записи в my-stream-topic через консольного потребителя, я вижу, что есть 2 записи, которые должны попадать в указанное выше окно.Однако агрегатор выбирает только 1 из них.

Я думал, что KTable с окном dataAgg будет содержать 1 запись для сгруппированного ключа, но агрегат использовал бы 2 записи для вычисления агрегата.Напечатанное совокупное значение неверно.

Чего мне не хватает?

1 Ответ

0 голосов
/ 01 июня 2018

KSQL может устанавливать метки времени записи при записи, однако вам нужно указывать метку времени при создании входного потока, а не при определении выходного потока.То есть отметка времени, указанная для входного потока, будет использоваться для установки поля метаданных записи при записи.

Это поведение довольно неинтуитивно, и я открыл заявку на эту проблему: https://github.com/confluentinc/ksql/issues/1367

Таким образом, вам нужно указать предложение with(TIMESTAMP='recorded_timestamp') при создании входного потока для запроса, который вы показали в вопросе.Если это невозможно, поскольку ваш запрос должен работать с другой отметкой времени, вам необходимо указать второй запрос, который копирует данные в новую тему.

CREATE STREAM my_stream_with_ts
    WITH (KAFKA_topic='my-stream-topic-with-ts')
AS select * from my_stream PARTITION BY PARTITION_KEY;

В качестве альтернативы вы можете установитьнастраиваемый экстрактор метки времени для вашего приложения Kafka Streams для извлечения метки времени из полезной нагрузки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...