В приложении обработки потока, использующем Spring Cloud Stream, я беру входной поток (с целым числом) и вызываю selectKey
для него, чтобы создать новую тему с теми же значениями, но с другим ключом (строка),Входная тема содержит записи в правильном формате JSON, например:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
Проблема заключается в том, что тема, созданная приложением потоковой обработки, имеет value
в виде строки, содержащей JSON, а не в качестве правильного JSON. , т.е.:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
Код выглядит следующим образом:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
То, что выполняет функция выше, потребляет входной поток и генерирует выходной поток (отправляется на output
). Этот выходной поток имеет те же значения, что и входной поток, но просто другой ключ. (В этом случае ключ берется из свойства publicId
значения.)
application.yml
выглядит следующим образом:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
Есть что-то, что я упускаю? Это на самом деле проблема, или это нормально, если JSON хранится в виде строки в сообщениях, генерируемых Spring Cloud Stream?
Другие вещи, которые я пробовал, но ничего не изменили:
- Использование собственного декодирования / кодирования
- Установка
spring.cloud.stream.bindings.output.content-type
в application/json
- Использование
map
вместо selectKey