Агрегатор SCDF генерирует только последнюю запись - PullRequest
0 голосов
/ 28 мая 2020

В настоящее время я экспериментирую с одним из вариантов использования, в котором мы должны объединить несколько входных сообщений в одно, прежде чем оно будет записано в файловую систему. Для этого я пытаюсь использовать модуль агрегатора, доступный здесь https://repo.spring.io/release/org/springframework/cloud/stream/app/aggregator-processor-kafka/2.1.1.RELEASE/

Моя среда - это локальный SCDF / Kafka, работающий на docker Dataflow 2.4.1. Release и Skipper 2.3.1 .Release

определение потока простое, как показано ниже

: inputTopi c> aggregator>: outputTopi c

В свойствах агрегатора я предоставляю эти значения свойств.

release: size() == 5
group-timeout: 5000
correlation: headers.headerName
aggregation: T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![new String(payload)],'')
message-store-type: simple

У меня есть две проблемы

  1. Хотя я дал размер релиза 5, и если есть, скажем, 10 сообщений, я не вижу свои выходные topi c для получения двух сообщений
  2. Вывод topi c содержит только последнее сообщение агрегированного заголовка, а не конкатенированные значения этого списка.

Пожалуйста, сообщите, есть ли какие-либо дополнительные конфигурации, которые помогут добиться агрегированного результата.

Еще одно наблюдение: если у меня есть поток как

http | aggregator > :topic

Та же самая конфигурация агрегатора работает нормально и по желанию. Но в нашем случае данные помещаются в topi c через процессор, использующий KStreams, и мы не уверены, есть ли способ, которым заголовки генерируются и считываются с использованием этих загрузочных приложений по умолчанию в пользовательский процессор.

Еще одно наблюдение во время отладки агрегатора: correlationKey получается или сравнивается через hashCode, а не как строку, не уверен, что это ошибка или особенность. но это то, что вызывает нарушение ожидаемого поведения.

1 Ответ

0 голосов
/ 01 июня 2020

Для тех, кто ищет ответ, я решил изменить корреляцию на

new String[headers.headerName]

и явно настроил сериализатор входных значений на bytearrayserializer

Но вопрос i есть от команды Spring, если мне не нужно прикасаться к агрегатору, есть ли свойство, которое я могу установить в своих свойствах выходного производителя в моем приложении Kstream, разработанном в загрузке 2.2.4, для выдачи заголовков в виде строки вместо массива байтов?

Я использую следующий API для установки заголовков

context.headers().add(key, value.getBytes())

Я пробовал

producer.header-mode: to none/headers.

Но мой агрегатор все еще не агрегировал и давал только последнюю запись. Только после изменения свойств агрегатора заработало. Но я хочу воздержаться от изменения свойства агрегатора, поскольку этот поток не принадлежит нам и является устаревшим приложением, отлично работающим с другими устаревшими потоками.

...