Агрегирование по нескольким разделам в потоках Кафки - PullRequest
0 голосов
/ 03 июня 2018

Частично это продолжение Агрегирование по определенному разделу в потоках Apache Kafka

Предположим, у меня есть тема под названием «события» с 3 разделами, на которыхЯ отправляю строку -> целочисленные данные примерно так:

(Боб, 3) на раздел 1

(Салли, 4) на раздел 2

(Боб, 2) нараздел 3

...

Я хотел бы объединить значения (в данном примере, просто сумму) по всем разделам, чтобы получить KTable, который выглядит примерно так:

(Салли, 4)

(Боб, 5)

Как уже упоминалось в ответе на вопрос, с которым я связан выше, напрямую выполнить этот вид невозможноперекрестная агрегация.Тем не менее, ответчик отметил, что это возможно, если сообщения имеют одинаковые ключи (что в данном случае верно).Как это может быть достигнуто?

Я также хотел бы иметь возможность запрашивать эти агрегированные значения из «глобального» хранилища состояний, которое реплицируется для каждого экземпляра приложения Kafka Streams.

Моей первой мыслью было использование GlobalKTable (который, я считаю, согласно этой странице , должен быть тем, что мне нужно).Однако раздел журнала изменений для этого хранилища состояний имеет такое же количество разделов, что и исходный раздел «события», и просто выполняет агрегацию для каждого раздела, а не для всех разделов.

Это уменьшенное изображениевниз версия моего приложения - не совсем уверен, куда идти отсюда:

final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey()
        .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();

builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    streams.close();
}));

1 Ответ

0 голосов
/ 04 июня 2018

Kafka Streams предполагает, что входные темы разбиты по ключам.Это предположение не верно для вашего случая.Таким образом, вам нужно сообщить об этом Kafka Streams.

В вашем конкретном случае вы бы заменили groupByKey на groupBy()

KTable<String, Double> aggregatedMetrics = eventStream
    .groupBy((k,v) -> k)
    .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

Лямбда - это пустышка, которая не модифицируетключ, однако, это подсказка для Kafka Streams перераспределить данные на основе ключа перед выполнением агрегации.

About GlobalKTable: это особый вид таблицы, который не является результатомагрегация, но заполняется только из темы журнала изменений.Похоже, ваш код уже работает правильно: запишите результат агрегирования в тему и перечитайте тему как GlobalKTable.

...