В настоящее время я экспериментирую с одним из вариантов использования, в котором мы должны объединить несколько входных сообщений в одно, прежде чем оно будет записано в файловую систему. Для этого я пытаюсь использовать модуль агрегатора, доступный здесь https://repo.spring.io/release/org/springframework/cloud/stream/app/aggregator-processor-kafka/2.1.1.RELEASE/
Моя среда - это локальный SCDF / Kafka, работающий на docker Dataflow 2.4.1. Release и Skipper 2.3.1 .Release
определение потока простое, как показано ниже
: inputTopi c> aggregator>: outputTopi c
В свойствах агрегатора я предоставляю эти значения свойств.
release: size() == 5
group-timeout: 5000
correlation: headers.headerName
aggregation: T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![new String(payload)],'')
message-store-type: simple
У меня есть две проблемы
- Хотя я дал размер релиза 5, и если есть, скажем, 10 сообщений, я не вижу свои выходные topi c для получения двух сообщений
- Вывод topi c содержит только последнее сообщение агрегированного заголовка, а не конкатенированные значения этого списка.
Пожалуйста, сообщите, есть ли какие-либо дополнительные конфигурации, которые помогут добиться агрегированного результата.
Еще одно наблюдение: если у меня есть поток как
http | aggregator > :topic
Та же самая конфигурация агрегатора работает нормально и по желанию. Но в нашем случае данные помещаются в topi c через процессор, использующий KStreams, и мы не уверены, есть ли способ, которым заголовки генерируются и считываются с использованием этих загрузочных приложений по умолчанию в пользовательский процессор.
Еще одно наблюдение во время отладки агрегатора: correlationKey получается или сравнивается через hashCode, а не как строку, не уверен, что это ошибка или особенность. но это то, что вызывает нарушение ожидаемого поведения.