Потоки Кафки (Подавление): перезапуск топологии - PullRequest
1 голос
/ 08 июня 2019

Я нашел два вопроса, спрашивающих, почему запись результата не генерируется, если в раздел не вставлена ​​новая запись:
1. "Kafka Stream Suppress session-windowed-aggregation" и
2. «Потоки Кафки (Подавление): закрытие окна времени по таймауту»

В ответах на оба вопроса объяснение состоит в том, что для отправки необходимо отправить новую записьодин.

Я не понимаю, почему выдача записи после истечения времени без новой записи нарушит контракт о подавлении и будет признательна за объяснение.

Лучшее предложение на данный момент - использовать фиктивные записи для запуска выброса.

Я думал, что закрытие и перезапуск потока (топология) может быть более подходящим, чем написание фиктивных записей.Я думал, что новый экземпляр потока увеличит количество записей и выдаст результат, поскольку время ожидания уже истекло.

Однако я попытался и увидел, что он не работает.Буду признателен за объяснение, если это возможно.

@Slf4j
public class KafkaStreamVerticle extends AbstractVerticle {

  private KafkaStreams streams;

  @Override
  public void start(Future<Void> startFuture) throws Exception {

    Single.fromCallable(() -> getStreamConfiguration()).subscribe(config -> {

      final StreamsBuilder builder = new StreamsBuilder();

      builder.<String, String>stream(KafkaProducerVerticle.TOPIC)
          .flatMapValues((k, v) -> List.<JsonObject>of(new JsonObject(v).put("origKey", k)))
          .selectKey((k, v) -> v.getString(KafkaProducerVerticle.CATEGORY))
          .flatMapValues(v -> List.<String>of(v.toString()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(4)).grace(Duration.ZERO)).count()
          // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())).toStream().foreach((k,
          .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(4), BufferConfig.unbounded()))
          .toStream().foreach((k, v) -> log.info("********* {}: {} - {}: {}", k.key(),
              k.window().start(), k.window().end(), v));

      streams = buildAndStartsNewStreamsInstance(config, builder);
      Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
      restartStreamsPeriodicaly(config, builder, 30_000L);
      log.info("consumer deployed");
      startFuture.complete();
    });
  }

  private KafkaStreams buildAndStartsNewStreamsInstance(Properties config,
      final StreamsBuilder builder) {
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.cleanUp();
    streams.start();
    return streams;
  }

  private void restartStreamsPeriodicaly(Properties config, final StreamsBuilder builder,
      @NonNull Long period) {
    vertx.setPeriodic(period, l -> {
      log.info("restarting streams!!");
      streams.close();
      streams = buildAndStartsNewStreamsInstance(config, builder);
    });
  }

  private Properties getStreamConfiguration() {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "suppress-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "suppress-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
    return streamsConfiguration;
  }
}

1 Ответ

0 голосов
/ 09 июня 2019

Kafka Stream обеспечивает семантику времени события, это означает, что это внутреннее время, только расширенное основание на временных отметках записей (внутреннее время никогда не продвигается в зависимости от времени настенных часов).«Время ожидания», которое вы принимаете, оно также основано на времени события (а не на времени настенных часов).

Предположим, у вас есть окно размера 5 (то есть, [0,5) будет окном)и вы видите данные с ts = 1,2,3.Это означает, что следующая запись может иметь метку времени = 4 и должна содержаться в окне.Однако, если новые данные не поступают, результат окна не может быть передан, независимо от того, как долго вы ждете.Только если поступает запись с отметкой времени = 5, внутреннее время увеличивается и теперь больше времени окончания окна, и результат для окна выводится.Если suppress () выдаст данные после некоторого времени ожидания на основе настенных часов, а следующая запись будет иметь метку времени = 4, это приведет к неверному результату.

Кроме того, suppress () запоминает свое внутреннее состояние и время,Следовательно, даже если вы перезапустите свое приложение, suppress () все равно будет буферизовать данные и все еще будет ждать записи с отметкой времени = 5 для отправки данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...