Я работаю над приложением Kafka Streams со следующей топологией:
private final Initializer<Set<String>> eventInitializer = () -> new HashSet<>();
final StreamsBuilder streamBuilder = new StreamsBuilder();
final KStream<String, AggQuantityByPrimeValue> eventStreams = streamBuilder.stream("testTopic",
Consumed.with(Serdes.String(), **valueSerde**));
final KStream<String, Value> filteredStreams = eventStreams
.filter((key,clientRecord)->recordValidator.isAllowedByRules(clientRecord));
final KGroupedStream<Integer, Value> groupedStreams = filteredStreams.groupBy(
(key, transactionEntry) -> transactionEntry.getNodeid(),
Serialized.with(Serdes.Integer(), **valueSerde**));
/* Hopping window */
final TimeWindowedKStream<Integer, Value> windowedGroupStreams = groupedStreams
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(25))
.grace(Duration.ofSeconds(0)));
/* Aggregating the events */
final KStream<Windowed<Integer>, Set<String>> suppressedStreams = windowedGroupStreams
.aggregate(eventInitializer, countAggregator, Materialized.as("counts-aggregate")
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("suppress-window")
.toStream();
suppressedStreams.foreach((windowed, value) -> eventProcessor.publish(windowed.key(), value));
return new KafkaStreams(streamBuilder.build(), config.getKafkaConfigForStreams());
Я наблюдаю, как периодически некоторые события сбрасываются во время / после оконного управления. Например:
- Все записи можно просмотреть / распечатать в методе isAllowedByRules (), которые действительны (разрешены фильтрами) и используются потоком.
- Но при печати событий в countAggregator я вижу, что некоторые события не проходят через него.
Текущие конфигурации для потоков:
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-app-id"
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstraps-server>);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
streamsConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760);
streamsConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
streamsConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760);
/*For window buffering across all threads*/
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 52428800);
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, **customSerdesForSet**);
Изначально я использовал окно с переключениями но я обнаружил, что в основном в конце окна теряются некоторые события, поэтому я переключился на переходное окно (лучше дублировать, чем потерять). Затем выпавшие события стали равны нулю. Но сегодня снова, после почти 4 дней, я видел несколько пропущенных событий, и среди них есть одна закономерность: они опаздывают почти на минуту по сравнению с другими событиями, которые были произведены вместе. Но тогда ожидается, что эти поздние события должны произойти в любом будущем windows, но этого не произошло. Поправьте меня здесь, если мое понимание неверно.
Также, как я упоминал в топи c, при перезапуске потоков (изящно) я мог видеть, что несколько событий снова теряются на этапе агрегации, хотя обрабатываются isAllowedByRules () метод.
Я много искал переполнение стека и другие сайты, но не смог найти причину root такого поведения. Это связано с какой-то конфигурацией, которую я пропускаю / неправильно настраиваю, или это может быть связано с какой-то другой причиной?