Агрегация потоков данных Spring Cloud из разделенного списка - PullRequest
3 голосов
/ 04 мая 2020

Я пытаюсь использовать SCDF для разделения и агрегирования моего списка. Но я не могу агрегировать, как ожидалось. Мой SCDF подпитывается списком по rabbitmq каждую минуту. Допустим, у меня есть список, состоящий из объектов, таких как:

[{"foo" : "11", "bar" : "21"},{"foo" : "11", "bar" : "22"},{"foo" : "12", "bar" : "23"}]

Я разделяю массив на объекты в сплиттере как: app.splitter.expression=#jsonPath(payload,'$.[*]'), и он работает, как и ожидалось.
И в агрегаторе, Я хочу агрегировать их на основе значения свойства foo, чтобы результатом агрегации были два разных массива, например:

[{"foo" : "11", "bar" : "21"},{"foo" : "11", "bar" : "22"}]
[{"foo" : "12", "bar" : "23"}]

Я установил correlation как: app.aggregator.correlation=#jsonPath(payload,'$.foo'), но Я не знаю, как установить release выражение SpEL. Можете ли вы показать мне, как я должен установить release? PS: В качестве обходного пути я попытался добавить последний объект в список, чтобы определить, что это конец списка. Итак, я добавляю {{"foo" : "EoS"} объект в список. И в release я попробовал: app.aggregator.release=!messages.?[new String(payload).contains("EoS")].empty но это не сработало.

...