Прыжок окна Кафки Стримс по размеру N - PullRequest
2 голосов
/ 03 мая 2020

У меня есть поток kafka, и мне нужен процессор, который выполняет следующие действия:

Использует 45-секундное скачкообразное окно с 5-секундным опережением, чтобы вычислить первые 5 отсчетов на основе одного измерения объекта домена. Например, если поток будет содержать данные Clickstream, мне понадобятся первые 5 URL-адресов, которые просматриваются по имени домена, но также отображаются в виде окна с перескоком.

Я видел примеры для подсчета окон, например:

KStream<String, GenericRecord> pageViews = ...;

// Count page views per window, per user, with hopping windows of size 5 minutes that advance every 1 minute
KTable<Windowed<String>, Long> windowedPageViewCounts = pageViews
    .groupByKey(Grouped.with(Serdes.String(), genericAvroSerde))
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5).advanceBy(Duration.ofMinutes(1))))
    .count()

И Top n агрегаций на MusicExample, например:

songPlayCounts.groupBy((song, plays) ->
            KeyValue.pair(TOP_FIVE_KEY,
                new SongPlayCount(song.getId(), plays)),
        Grouped.with(Serdes.String(), songPlayCountSerde))
        .aggregate(TopFiveSongs::new,
            (aggKey, value, aggregate) -> {
              aggregate.add(value);
              return aggregate;
            },
            (aggKey, value, aggregate) -> {
              aggregate.remove(value);
              return aggregate;
            },
            Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
                .withKeySerde(Serdes.String())
                .withValueSerde(topFiveSerde)
        );

Мне просто не удается объединить 2 - где я получаю как оконное, так и верхнее русских скоплений. Есть мысли?

1 Ответ

1 голос
/ 04 мая 2020

В общем, да, однако, для неоконной агрегации top-N алгоритм будет всегда приближенным (невозможно получить точный результат, потому что нужно будет буферизовать все что невозможно для неограниченного ввода). Тем не менее, для скачкообразного окна вы должны выполнить точное вычисление.

Для случая с оконным регистром, фактический шаг агрегации, может просто накапливать все входные записи за окно (например, возвращать List<V> или некоторые другие коллекция). Для этого результата KTable вы применяете функцию mapValues(), которая получает List<V> входных записей для каждого окна (и ключа) и может вычислить фактический результат top-N, который вы ищете.

...