Kafka Streams: следует ли увеличивать время потока для каждой клавиши, чтобы протестировать оконное подавление? - PullRequest
1 голос
/ 09 июля 2020

Я узнал из этого блога и этого учебника , что для проверки подавления с семантикой времени события нужно отправлять фиктивные записи для ускорения времени потока. Этим я пытался ускорить время. Но, похоже, это не сработает, если для определенного ключа не будет продвинуто время.

У меня есть собственный TimestampExtractor, который связывает мое предпочтительное "время потока" с записями. Мой псевдокод топологии потока выглядит следующим образом (я использую API DSL Kafka Streams):

    source.mapValues(someProcessingLambda)
          .flatMap(flattenRecordsLambda)
          .groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
          .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
          .aggregate(()->null, aggregationLambda)
          .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

Мой ввод имеет следующий формат:

   1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
   2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
   3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
   4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
    .
    .

Теперь записывает 1 и 2 принадлежат 10-минутному окну согласно stream_time, а 3 и 4 принадлежат другому. В этом окне записи агрегируются согласно id. Я ожидал, что запись 3 будет сигнализировать о том, что поток продвинулся вперед, и вызовет подавление для выдачи данных, соответствующих 1-му окну. Однако данные не передаются, пока я не отправлю фиктивную запись с id:1, чтобы увеличить время потока для этого ключа.

Я неправильно понял инструкцию по тестированию? Это ожидаемое поведение? Имеет ли значение ключ фиктивной записи?

1 Ответ

2 голосов
/ 13 июля 2020

Прошу прощения за беспокойство. Это действительно непростая проблема. У меня есть несколько идей по добавлению некоторых операций для поддержки такого рода интеграционного тестирования, но это сложно обойтись без нарушения базовой c семантики времени обработки потока.

Похоже, вы тестируете «настоящий» KafkaStreams приложение, в отличие от тестирования с помощью TopologyTestDriver. Мое первое предложение состоит в том, что вам будет намного удобнее проверять семантику вашего приложения с помощью TopologyTestDriver, если он соответствует вашим потребностям.

Мне кажется, что у вас может быть более одного раздела во входной топологии c (и, следовательно, ваше приложение). Если ключ 1 переходит в один раздел, а ключ 3 - в другой, вы увидите то, что наблюдали. Каждый раздел вашего приложения независимо отслеживает время потока. TopologyTestDriver отлично работает, потому что использует только один раздел, а также потому, что обрабатывает данные синхронно. В противном случае вам придется создавать свои «фиктивные» сообщения о продвижении по времени до go в том же разделе, что и ключ, который вы пытаетесь вывести sh.

Это будет особенно сложно потому что ваш «flatMap (). groupByKey ()» будет перераспределять данные. Вам нужно будет создать фиктивное сообщение, чтобы оно попало в правильный раздел после повторного разделения. Или вы можете поэкспериментировать с написанием своих фиктивных сообщений прямо в топи повторного разбиения c.

Если вам действительно нужно протестировать с KafkaStreams вместо TopologyTestDriver, я думаю, что проще всего просто написать «временное продвижение» сообщение для каждого ключа, как вы предлагали в своем вопросе. Не потому, что это строго необходимо, а потому, что это самый простой способ учесть все эти предостережения. Я также упомяну, что мы работаем над некоторыми общими улучшениями в обработке времени потока в Kafka Streams, которые должны значительно упростить ситуацию, но, конечно, сейчас это вам не поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...