Потоки Кафки сбрасывают сообщения во время работы с окнами и при перезагрузке - PullRequest
1 голос
/ 02 марта 2020

Я работаю над приложением Kafka Streams со следующей топологией:

private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();

final StreamsBuilder streamBuilder = new StreamsBuilder();

    final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
            Consumed.with(Serdes.String(), **valueSerde**));

    final  KStream<String, Value> filteredStreams = eventStreams
                .filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));

    final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
        (key, transactionEntry) -> transactionEntry.getNodeid(),
        Serialized.with(Serdes.Integer(), **valueSerde**));

    /* Hopping window */
    final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
        .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
            .grace(Duration.ofSeconds(0)));

    /* Aggregating the events */
    final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
        .aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
                .withName("suppress-window")
            .toStream();

    suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value)); 

    return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());

Я наблюдаю, как периодически некоторые события сбрасываются во время / после оконного управления. Например:

  • Все записи можно просмотреть / распечатать в методе isAllowedByRules (), которые действительны (разрешены фильтрами) и используются потоком.
  • Но при печати событий в countAggregator я вижу, что некоторые события не проходят через него.

Текущие конфигурации для потоков:

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>); 
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);

streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);

Изначально я использовал окно с переключениями но я обнаружил, что в основном в конце окна теряются некоторые события, поэтому я переключился на переходное окно (лучше дублировать, чем потерять). Затем выпавшие события стали равны нулю. Но сегодня снова, после почти 4 дней, я видел несколько пропущенных событий, и среди них есть одна закономерность: они опаздывают почти на минуту по сравнению с другими событиями, которые были произведены вместе. Но тогда ожидается, что эти поздние события должны произойти в любом будущем windows, но этого не произошло. Поправьте меня здесь, если мое понимание неверно.

Также, как я упоминал в топи c, при перезапуске потоков (изящно) я мог видеть, что несколько событий снова теряются на этапе агрегации, хотя обрабатываются isAllowedByRules () метод.

Я много искал переполнение стека и другие сайты, но не смог найти причину root такого поведения. Это связано с какой-то конфигурацией, которую я пропускаю / неправильно настраиваю, или это может быть связано с какой-то другой причиной?

1 Ответ

1 голос
/ 05 марта 2020

Насколько я понимаю, у вас есть пустой льготный период:

 /* Hopping window */
...
            .grace(Duration.ofSeconds(0))

Итак, ваше окно закрыто без возможности позднего прибытия.

Тогда по вашему подвопросу: But then expectation is that these late events should come in any of the future windows but that didn't happen. Correct me here if my understanding is not right.

Возможно, вы смешиваете время события и время обработки. Ваша запись будет классифицирована как «поздняя», если временная метка записи (добавленная производителем во время создания или посредниками при прибытии в кластер, если не установлена ​​производителем) находится за пределами текущего окна.

Вот пример с двумя записями '*'.

Время их события (et1 и et2) вписывается в окно:

 |    window       |
 t1                t2
 |      *    *     |
       et1  et2          

Но время обработки et2 (pt2) равно фактически следующим образом:

 |    window       |
 t1                t2
 |      *          |   *
       pt1            pt2

Здесь окно представляет собой интервал времени между t1 и t2 (время обработки), et1 и et2 - соответственно время события 2 записей '*'. et1 и et2 - это временные метки, установленные в самих записях. в этом примере et1 и et2 находятся между t1 и t2, et2 были получены после закрытия окна, так как ваш льготный период равен 0, он будет пропущен.

Может быть объяснением

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...