Как в этом примере: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.4.RELEASE/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources
Я создал это:
open class Config {
@Bean
open fun emitter(): EmitterProcessor<MyMessageDTO> { return EmitterProcessor.create() }
@Bean
open fun inbound(emitter: EmitterProcessor<MyMessageDTO>): Supplier<Flux<MyMessageDTO>> {
return Supplier { emitter }
}
}
И это:
@RestController()
@RequestMapping("/inbound")
class InboundController(
val emitter: EmitterProcessor<MyMessageDTO>,
) {
@PostMapping("/messagebird/{brandId}/{serviceId}")
fun ingestMessageBirdReply(
@RequestBody data: MessageResponse
) {
emitter.onNext(MyMessageDTO(data))
}
}
И, наконец, этот application.yaml:
spring:
cloud:
function.definition: inbound
stream:
bindings:
inbound-out-0:
inbound-stream
kinesis:
binder:
autoAddShards: true
autoCreateStream: true
locks:
table: inbound_locks
leaseDuration: 500
checkpoint:
table: inbound_kinesis_table
В кинезисе ничего не заканчивается. Если я переключаюсь на использование функции и использования шлюза интеграции Spring, все заканчивается в Kinesis. Если я добавляю другую функцию через определение с конвейером, эта функция действительно вызывается , но конечный результат не заканчивается кинезисом. Есть подсказки, что могло быть не так?