Группа потоков Кафки, основанная на отметке времени - PullRequest
1 голос
/ 29 апреля 2020

Я использую приложение kafka для голосования, где пользователь может выбрать кандидата и изменить выбор в течение 1 часа.

Поскольку это подходит для KTable, я использую приложение kafka stream. Тем не менее, существует требование временного диапазона, то есть мне нужно groupBy().count() только для указанного c временного диапазона (например, с 10: 00-11: 00).

Как мне добиться этого с помощью Kafka Stream API?
Насколько я знаю, Kafka (я использую Kafka 2.3) помещает опубликованную метку времени в метаданные, но как получить к ней доступ? Я подумываю об использовании .filter() на основе отметки времени

Также я вижу документацию окон , но кажется, что время относительно (например, последний 1 час) вместо фиксированного (10: 00- 11:00).

Спасибо

Ответы [ 2 ]

1 голос
/ 29 апреля 2020

Тимоти,

Чтобы получить метку времени записи, вы можете использовать операцию transformValues(). Предоставляемый вами ValuesTransformer имеет доступ к ProcessorContext, и вы можете вызвать ProcessorContex.timestamp() в методе ValueTransformer.transform(). Если отметка времени находится в требуемом диапазоне, верните запись, в противном случае верните ноль. Затем добавьте filter() после transformValues(), чтобы удалить отклоненные записи.

Вот пример, который я думаю будет работать

class GroupByTimestampExample {

  public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();
    // You need to update the the time fields these are just placeholders
    long earliest = Instant.now().toEpochMilli();
    long latest = Instant.now().toEpochMilli() + (60 * 60 * 1000);

    final ValueTransformerSupplier<String, String> valueTransformerSupplier = new TimeFilteringTransformer(earliest, latest);

    final KTable<String, Long> voteTable = builder.<String, String>stream("topic")
                                            .transformValues(valueTransformerSupplier)
                                            .filter((k, v) -> v != null)
                                            .groupByKey()
                                            .count();

  }




  static final class TimeFilteringTransformer implements ValueTransformerSupplier<String, String> {

    private final long earliest;
    private final long latest;

    public TimeFilteringTransformer(final long earliest, final long latest) {
      this.earliest = earliest;
      this.latest = latest;
    }

    @Override
    public ValueTransformer<String, String> get() {
      return new ValueTransformer<String, String>() {
        private ProcessorContext processorContext;

        @Override
        public void init(ProcessorContext context) {
          processorContext = context;
        }

        @Override
        public String transform(String value) {
         long ts = processorContext.timestamp();
         if (ts >= earliest && ts <= latest) {
            return value;
         }
         return null;
        }

        @Override
        public void close() {

        }
      };
    }
  }
}

Позвольте мне знать, как это происходит.

1 голос
/ 29 апреля 2020

На самом деле Падающее окно равно Fixed-size, non-overlapping, gap-less windows. В вашем случае использования продолжительность окна составляет один час, и, как ваш пример, будет создано окно 10: 00-11: 00 (начало включительно, конец исключая):

kStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)))
    .count();
...