Нет записей для сообщений о приостановке потока Kinesis в течение 1 секунды - PullRequest
0 голосов
/ 16 апреля 2019

Я использую пружинный механизм связывания для приема сообщений в пакетном режиме

Иногда я не могу использовать сообщения из потоков. Это не происходит все время

Нет записей для [ShardConsumer {shardOffset = KinesisShardOffset {iteratorType = AFTER_SEQUENCE_NUMBER, SequenceNumber = '49594358705006691330463332232285735104253344290967650306', метка времени = нуль, поток = 'mystream-1', осколок = 'shardId-000000000000', сброс = ложь} , state = CONSUME}] для sequenceNumber [null]. Приостановка потребления в течение [1000] миллисекунд.

Я продолжаю получать это сообщение, и сообщения не используются.

Мой Conf выглядит как ниже

 spring:
  cloud:
    stream:
      bindings:
        input:
          group: mygroup
          destination: mystream-1
          content-type: application/json
        output:
          destination: mystream-2
          content-type: application/json
      kinesis:        
        bindings:
          input:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 60000
              consumer-backoff: 1000
        binder:
          headers: x-item_id,x-message_type
          locks:
            table: lTLocks
            leaseDuration: 30
            refreshPeriod: 3000
          checkpoint:
            table: lTCheckPoints

Есть сообщения в потоке, но я не использую их периодически. Не могли бы вы помочь.

1 Ответ

0 голосов
/ 16 апреля 2019

Я думаю, что ваш магазин контрольных точек и осколок в потоке Kinesis содержат разные порядковые номера.

Пожалуйста, рассмотрите возможность использования таблицы очистки lTCheckPoints перед началом использования из потока.

Я не говорю, что вам нужно делать это постоянно, но, по крайней мере, ради чистой среды тестирования.

...