Оконная агрегация по количеству событий - PullRequest
1 голос
/ 30 марта 2020

Я сгруппировал свои события kafka:

    private static void createImportStream(final StreamsBuilder builder, final Collection<String> topics) {
        final KStream<byte[], GraphEvent> stream = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventThriftSerde()));
        stream.filter((key, request) -> {
            return Objects.nonNull(request);
        }).groupBy(
                (key, value) -> Integer.valueOf(value.getSourceType()),
                Grouped.with(Serdes.Integer(), new UserEventThriftSerde()))
              .aggregate(ArrayList::new, (key, value, aggregatedValue) -> {
                          aggregatedValue.add(value);
                          return aggregatedValue;
                      },
                      Materialized.with(Serdes.Integer(), new ArrayListSerde<UserEvent>(new UserEventThriftSerde()))
              ).toStream();
    }

как я могу добавить window, но не по времени, а по количеству событий. Причина в том, что события будут массовым дампом, агрегация с временным окном не подходит, так как все события могут появляться в одни и те же несколько секунд.

1 Ответ

2 голосов
/ 31 марта 2020

Kafka Streams не поддерживает основанные на подсчете windows из коробки, потому что они не детерминированы c, и трудно обрабатывать неупорядоченные данные.

Вместо использования DSL, вы можете использовать Processor API для создания пользовательского оператора для вашего случая использования.

...