EmitterProcessor с Spring Cloud Stream в Kotlin - PullRequest
0 голосов
/ 28 мая 2020

Как в этом примере: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.4.RELEASE/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

Я создал это:

open class Config {

    @Bean
    open fun emitter(): EmitterProcessor<MyMessageDTO> { return EmitterProcessor.create() }

    @Bean
    open fun inbound(emitter: EmitterProcessor<MyMessageDTO>): Supplier<Flux<MyMessageDTO>> {
        return Supplier { emitter }
    }
}

И это:

@RestController()
@RequestMapping("/inbound")
class InboundController(
    val emitter: EmitterProcessor<MyMessageDTO>,
) {
    @PostMapping("/messagebird/{brandId}/{serviceId}")
    fun ingestMessageBirdReply(
        @RequestBody data: MessageResponse
    ) {
        emitter.onNext(MyMessageDTO(data))
    }
}

И, наконец, этот application.yaml:

spring:
   cloud:
      function.definition: inbound
      stream:
         bindings:
            inbound-out-0:
              inbound-stream
         kinesis:
           binder:
            autoAddShards: true
            autoCreateStream: true
            locks:
              table: inbound_locks
              leaseDuration: 500
            checkpoint:
              table: inbound_kinesis_table

В кинезисе ничего не заканчивается. Если я переключаюсь на использование функции и использования шлюза интеграции Spring, все заканчивается в Kinesis. Если я добавляю другую функцию через определение с конвейером, эта функция действительно вызывается , но конечный результат не заканчивается кинезисом. Есть подсказки, что могло быть не так?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...