Процессор потока данных Spring для обработки одной полезной нагрузки и записи нескольких строк в таблицу данных - PullRequest
0 голосов
/ 29 мая 2020

Я хотел бы задать простой вопрос:

Я реализовал процессор, который обрабатывает одну полезную нагрузку и возвращает массив объектов, например:

@EnableBinding(Processor.class)
public class SimpleProcessor {
    ...
    public SimpleProcessor () {
        ...
    }

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
            public OutgoingEntity[] processData(IncomingEntity payload) {

    // business logic here

    return outgoingEntity; 
}

У меня есть stream в SCDF и промежуточном программном обеспечении как kafka следующим образом:

some source | SimpleProcessor | JDBC sink

для проверки сообщений я заменил приемник журнала на приемник relpace JDB C, и он регистрирует массивы json. Когда я использую приемник JDB C, он выдает исключение и сообщает, что приемник JDB C не может получить доступ к свойствам в объекте, что имеет смысл, поскольку это массив объектов вместо объекта ...

My вопрос:

  1. Могу ли я использовать модификацию своего процессора, чтобы он мог обрабатывать полезную нагрузку один раз и выдавать сообщение несколько раз, например,

    @Transformer(inputChannel = Processor.INPUT)
    public void processData(IncomingEntity payload) {
        ...
        for(OutGoingEntity o: OutgoingEntity[]){ 
            outputMethode();
        }
    }
    
    @Transformer(outputChannel = Processor.OUTPUT)
    private OutGoingEntity outputMethode() {
        .....
        return outGoingEntity; 
    }
    

    Итак, это может передавать несколько объектов json в приемник jdb c и записывать в него данные.

  2. Могу ли я использовать приемник JDB C для работы с массивами? Как?

  3. Могу ли я использовать другие процессоры или раковину, чтобы завершить sh эту задачу?

1 Ответ

0 голосов
/ 07 июля 2020

Как и Маттиас Дж. Сакс, предложенный в комментарии, я использовал метод flatMapValue в KStream для работы с массивом из ввода. Я поставил этот процессор scdf после массива one forward. Это отлично работает.

@EnableBinding(KafkaStreamsProcessor.class)
public class ArrayProcessor {

@StreamListener("input") @SendTo("output")
public KStream<?, String> process(KStream<?, String> payload) {

    return payload.flatMapValues( //impl ) 

...} }
...