Попробуйте отправить сообщения в Kinesis с помощью библиотек Spring Cloud Stream. - PullRequest
0 голосов
/ 02 декабря 2018

Я пишу производителю для отправки сообщений в поток Kinesis с помощью библиотек потокового облака Spring. Я могу успешно отправить данные в kinesis, но на стороне kinesis происходит сбой с исключением пропускной способности, превышенным. Есть способ повторить попытку отправкиэти сообщения снова и точно знать, какое сообщение не удалось?Кроме того, я не хочу использовать KPL или KCL.

Я попробовал решение, предложенное в ответе, и это моя конфигурация:

spring.cloud.stream.bindings.input.producer.errorChannelEnabled: true spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.myGroup.errors

Это правильный способ, а затем сопоставить «spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.Свойство myGroup.errors "to" error-channel "в конфигурации весенней интеграции, как показано ниже?

<int-http:inbound-channel-adapter id="abcErrorChannel"
                                      channel="defChannel"
                                      **error-channel="errorChannel**"
                                  </int-http:inbound-channel-adapter>

1 Ответ

0 голосов
/ 03 декабря 2018

Запрос PutRecord(s) в AWS Kinesis Binder полностью основан на AmazonKinesisAsync, и отправка в AWS действительно async .Таким образом, мы не можем использовать встроенную функцию RetryTemplate.Но в то же время ошибка этой асинхронной операции отправляется на адресат errorChannel: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producer_properties

errorChannelEnabled

When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See “[binder-error-channels]” for more information.

По умолчанию: false.

Имя канала основывается на получателе и группе потребителей, плюс суффикс errors: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-overview-error-handling

Кроме того, в случае, если выпривязываются к существующему получателю, например:

spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup

полное имя получателя myFooDestination.myGroup, а затем выделенное имя канала ошибок: myFooDestination.myGroup.errors.

...