поток kafka сделать локальную агрегацию - PullRequest
2 голосов
/ 22 октября 2019

Я пытаюсь выполнить локальную агрегацию.

В теме ввода есть записи, содержащие несколько элементов, и я использую flatmap, чтобы разделить запись на несколько записей с помощью другого ключа (здесь element_id). Это вызывает переразметку, так как я применяю группировку для агрегации позже в процессе потока. Проблема: в этом разделе перераспределения слишком много записей, и приложение не может их обработать (задержка увеличивается).

Вот пример входящих данных

ключ: another ID

значение:

{
  "cat_1": {
    "element_1" : 0,
    "element_2" : 1,
    "element_3" : 0
  },
  "cat_2": {
    "element_1" : 0,
    "element_2" : 1,
    "element_3" : 1
  }
}

И пример желаемого результата агрегации: ключ: element_2 значение:

{
  "cat_1": 1,
  "cat_2": 1
}

Поэтому я хотел бы сделать первый "локальная агрегация »и прекратить разделять входящие записи, что означает, что я хочу агрегировать все элементы локально (без переразделения), например, в окне 30 секунд, а затем производить результат для каждого элемента в теме. Поток, использующий эту тему, позже объединяется на более высоком уровне.

Я использую Stream DSL, но я не уверен, что этого достаточно. Я пытался использовать методы process() и transform(), которые позволяют мне использовать API-интерфейс процессора, но я не знаю, как правильно создавать некоторые записи в пунктуации или помещать записи в поток.

Как я мог этого достичь? Спасибо

1 Ответ

1 голос
/ 22 октября 2019

transform() возвращает KStream, по которому вы можете позвонить to(), чтобы записать результаты в тему.

stream.transform(...).to("output_topic");

В пунктуации вы можете позвонить context.forward(), чтобы отправить запись ниже. Вам все еще нужно позвонить to(), чтобы записать переадресованную запись в тему.

Чтобы реализовать пользовательское агрегирование, рассмотрите следующий псевдо-код:

builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
    Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
                                Serdes.Integer(),
                                Serdes.Integer());
builder.addStateStore(keyValueStoreBuilder);

stream = builder.stream(topic, Consumed.with(Serdes.Integer(), Serdes.Integer()));
stream.transform(() -> 
    new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {

    private KeyValueStore<Integer, Integer> state;

    @Override
    public void init(final ProcessorContext context) {
        state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
        context.schedule(
            Duration.ofMinutes(1), 
            PunctuationType.STREAM_TIME, 
            timestamp -> {
                // You can get aggregates from the state store here 

                // Then you can send the aggregates downstream 
                // with context.forward();

                // Alternatively, you can output the aggregate in the 
                // transform() method as shown below
            }
        );
    }

    @Override
    public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
        // Get existing aggregates from the state store with state.get().

        // Update aggregates and write them into the state store with state.put().

        // Depending on some condition, e.g., 10 seen records, 
        // output an aggregate downstream by returning the output.
        // You can output multiple aggregates by using KStream#flatTransform().

        // Alternatively, you can output the aggregate in a 
        // punctuation as shown above
    }

    @Override
    public void close() {
    }
}, stateStoreName)

С помощью этого ручного агрегирования вы можете реализовать агрегацию более высокого уровня в том же приложении потоков и использовать повторное разбиение.

process() - терминальная операция, т. Е. Она ничего не возвращает.

...