Я нашел два вопроса, спрашивающих, почему запись результата не генерируется, если в раздел не вставлена новая запись:
1. "Kafka Stream Suppress session-windowed-aggregation" и
2. «Потоки Кафки (Подавление): закрытие окна времени по таймауту»
В ответах на оба вопроса объяснение состоит в том, что для отправки необходимо отправить новую записьодин.
Я не понимаю, почему выдача записи после истечения времени без новой записи нарушит контракт о подавлении и будет признательна за объяснение.
Лучшее предложение на данный момент - использовать фиктивные записи для запуска выброса.
Я думал, что закрытие и перезапуск потока (топология) может быть более подходящим, чем написание фиктивных записей.Я думал, что новый экземпляр потока увеличит количество записей и выдаст результат, поскольку время ожидания уже истекло.
Однако я попытался и увидел, что он не работает.Буду признателен за объяснение, если это возможно.
@Slf4j
public class KafkaStreamVerticle extends AbstractVerticle {
private KafkaStreams streams;
@Override
public void start(Future<Void> startFuture) throws Exception {
Single.fromCallable(() -> getStreamConfiguration()).subscribe(config -> {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(KafkaProducerVerticle.TOPIC)
.flatMapValues((k, v) -> List.<JsonObject>of(new JsonObject(v).put("origKey", k)))
.selectKey((k, v) -> v.getString(KafkaProducerVerticle.CATEGORY))
.flatMapValues(v -> List.<String>of(v.toString()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(4)).grace(Duration.ZERO)).count()
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())).toStream().foreach((k,
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(4), BufferConfig.unbounded()))
.toStream().foreach((k, v) -> log.info("********* {}: {} - {}: {}", k.key(),
k.window().start(), k.window().end(), v));
streams = buildAndStartsNewStreamsInstance(config, builder);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
restartStreamsPeriodicaly(config, builder, 30_000L);
log.info("consumer deployed");
startFuture.complete();
});
}
private KafkaStreams buildAndStartsNewStreamsInstance(Properties config,
final StreamsBuilder builder) {
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
return streams;
}
private void restartStreamsPeriodicaly(Properties config, final StreamsBuilder builder,
@NonNull Long period) {
vertx.setPeriodic(period, l -> {
log.info("restarting streams!!");
streams.close();
streams = buildAndStartsNewStreamsInstance(config, builder);
});
}
private Properties getStreamConfiguration() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "suppress-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "suppress-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
return streamsConfiguration;
}
}