Как извлечь записи полезной нагрузки Collection из выходных данных агрегатора потоков данных Spring в следующем модуле Transformer - PullRequest
0 голосов
/ 06 марта 2019

Я использую SCDF [Spring Cloud Data Flow], и в своем потоке я агрегирую полезные нагрузки, а в следующем модуле преобразования я хотел бы получить размер полученной полезной нагрузки [который представляет собой объект коллекции, выводимый приложением-агрегатором согласно функции агрегирования]

Пример потока:

stream create file_agr_trc_log  --definition "file --directory=/Users/keerthikanth/Documents/GroovySampleCodes/test  --filename-pattern='UNITINFO_DWH.txt' --mode=lines --outputType=plain/text   | aggregator  --release='size() == 1000'  --aggregation=\"#this.![new String(payload)]\" --group-timeout=4 --message-store-type=simple  --correlation='T(Thread).currentThread().id'   | log " --deploy

Файл /Users/keerthikanth/Documents/GroovySampleCodes/test/UNITINFO_DWH.txt содержит

A

B

C

D

E

F

G

H

I

После выполнения потока приложение Aggregator, которое использовало агрегацию [--aggregation = \ "# this.! [New String (payload)] \"], вывело коллекцию, как показано ниже в приложении журнала

[A, B, C, D, E, F, G, H, I]

Актуальная проблема:

Когда я применил новое специальное приложение трансформатора после приложения агрегатораЧтобы найти размер и первую запись коллекции, я получаю неправильные записи, когда пытаюсь получить первую запись коллекции.ion i, e Получение [в качестве первого элемента "," в качестве второго элемента "A" в качестве третьего элемента.

Таким образом, косвенно при выводе полезной нагрузки сбора из модуля агрегатора оно преобразуется в строку.

Я пытался применить spring.cloud.stream.bindings.output.content-type = 'application / x-java-object; type = java.util.List' в агрегаторе и spring.cloud.stream.bindings.input.content-type = 'application / x-java-object; type = java.util.List' в пользовательском преобразователе, но все еще сталкивается с той же проблемой, что и все элементы как часть коллекции, включая [,] charectors

Другой поток с типом контента


stream create file_agr_trc_log  --definition "file --directory=/Users/keerthikanth/Documents/GroovySampleCodes/test  --filename-pattern='UNITINFO_DWH.txt' --mode=lines --outputType=plain/text   | aggregator --aggregation=\"#this.![new String(payload)]\"  --release='size() == 1000' --group-timeout=4 --message-store-type=simple  --correlation='T(Thread).currentThread().id' --spring.cloud.stream.bindings.output.content-type='application/x-java-object;type=java.util.List'  |  transCount --spring.cloud.stream.bindings.input.content-type='application/x-java-object;type=java.util.List'   | log " --deploy

Customer Transformer:

@EnableBinding(Processor.class)
public class processorApp {
  @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public String transformer(Collection payloadList)  {
     System.out.println("Size of Payload" + payloadList.size());
    System.out.println("Payload "+ payloadList.iterator().next().toString() );

     return payload;
}

Может кто-нибудь подсказать, как извлечь размер полученной полезной нагрузки сбора из приложения-агрегатора в настраиваемом преобразователе.даже после применения типа контента это не рассматриваетсяДаже с агрегацией по умолчанию также та же ситуация, что и при предоставлении вывода списка, но получение преобразователя с учетом каждого символа как элемента списка

Мой вывод должен быть похож на Первый элемент - общий размер элементов в этой коллекции

A - 8 Размер

1 Ответ

0 голосов
/ 06 марта 2019

Почему вы ожидаете, что это будет коллекция, если ваше приложение агрегации фактически генерирует String на основании предоставленной вами инструкции: --aggregation=\"#this.![new String(payload)]\".

Как правило, вам не нужно указывать любой aggregation вариант для вашего случая, поскольку по умолчанию он равен java.util.List. Пожалуйста, смотрите агрегатор README для более подробной информации

...