Spring Cloud Stream генерирует значение в виде строки, содержащей JSON, а не просто JSON - PullRequest
0 голосов
/ 24 октября 2019

В приложении обработки потока, использующем Spring Cloud Stream, я беру входной поток (с целым числом) и вызываю selectKey для него, чтобы создать новую тему с теми же значениями, но с другим ключом (строка),Входная тема содержит записи в правильном формате JSON, например:

"key": {
  "id": 1
},
"value": {
  "id": 1,
  "public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...

Проблема заключается в том, что тема, созданная приложением потоковой обработки, имеет value в виде строки, содержащей JSON, а не в качестве правильного JSON. , т.е.:

"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"

Код выглядит следующим образом:

@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
         stream.selectKey { _, value -> value.publicId }

То, что выполняет функция выше, потребляет входной поток и генерирует выходной поток (отправляется на output). Этот выходной поток имеет те же значения, что и входной поток, но просто другой ключ. (В этом случае ключ берется из свойства publicId значения.)

application.yml выглядит следующим образом:

spring.cloud.stream:
  bindings:
    input:
      destination: input-topic
    output:
      destination: output-output
  kafka:
    streams:
      binder:
        application-id: test-app-id-1
      bindings:
        input:
          consumer:
            keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
        output:
          producer:
            keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde

Есть что-то, что я упускаю? Это на самом деле проблема, или это нормально, если JSON хранится в виде строки в сообщениях, генерируемых Spring Cloud Stream?

Другие вещи, которые я пробовал, но ничего не изменили:

  • Использование собственного декодирования / кодирования
  • Установка spring.cloud.stream.bindings.output.content-type в application/json
  • Использование map вместо selectKey

1 Ответ

1 голос
/ 24 октября 2019

Это означает, что вы отправляете publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a" в виде строки вместо POJO.

Если это то, что вы отправляете, вы должны использовать StringSerde, а не JsonSerde.

* 1007. * РЕДАКТИРОВАТЬ

Я только что проверил его с приложением Java, и он работает как ожидалось ...

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {

    public static void main(String[] args) {
        SpringApplication.run(So58538297Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
        return stream.selectKey((key, value) -> value.getBar());
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        ObjectMapper mapper = new ObjectMapper();
        return args -> {
            template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
        };
    }

    @KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
    public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("out:" + in + ", key:" + key);
    }

    @KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
    public void in(String in) {
        System.out.println("in:" + in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

    }

}

и

spring.application.name=so58538297

spring.kafka.consumer.auto-offset-reset=earliest

и

in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz
...