Я пытаюсь выполнить локальную агрегацию.
В теме ввода есть записи, содержащие несколько элементов, и я использую flatmap
, чтобы разделить запись на несколько записей с помощью другого ключа (здесь element_id
). Это вызывает переразметку, так как я применяю группировку для агрегации позже в процессе потока. Проблема: в этом разделе перераспределения слишком много записей, и приложение не может их обработать (задержка увеличивается).
Вот пример входящих данных
ключ: another ID
значение:
{
"cat_1": {
"element_1" : 0,
"element_2" : 1,
"element_3" : 0
},
"cat_2": {
"element_1" : 0,
"element_2" : 1,
"element_3" : 1
}
}
И пример желаемого результата агрегации: ключ: element_2
значение:
{
"cat_1": 1,
"cat_2": 1
}
Поэтому я хотел бы сделать первый "локальная агрегация »и прекратить разделять входящие записи, что означает, что я хочу агрегировать все элементы локально (без переразделения), например, в окне 30 секунд, а затем производить результат для каждого элемента в теме. Поток, использующий эту тему, позже объединяется на более высоком уровне.
Я использую Stream DSL, но я не уверен, что этого достаточно. Я пытался использовать методы process()
и transform()
, которые позволяют мне использовать API-интерфейс процессора, но я не знаю, как правильно создавать некоторые записи в пунктуации или помещать записи в поток.
Как я мог этого достичь? Спасибо