Прежде всего, когда я отлаживал ваш код, я заметил, что он бесконечно зацикливается во внутреннем цикле (while (!string.IsNullOrEmpty(iteratorId))
) и никогда не зацикливается на всех осколках вашего потока (при условии, что у вас> 1). Причина объясняется в https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty - поскольку производитель никогда не звонил MergeShards
или SplitShards
, они остаются открытыми, поэтому NextShardIterator
никогда не будет NULL
.
Вот почему вы когда-либо видите записи, помещенные в первый шард (или, по крайней мере, я это делал при запуске вашего кода) - вы должны читать из шардов параллельно.
Что касается вашего шаблона использования, вы используете:
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue
Таким образом, вы, по сути, говорите Кинезису: «Дайте мне все записи в потоке с начала времен» (или, по крайней мере, до тех пор, пока не истечет срок хранения). Вот почему вы продолжаете видеть одни и те же старые записи в дополнение к новым (опять же, это то, что я видел, когда запускал ваш код).
Вызов GetRecords[Async]
фактически не удаляет записи из потока (см. https://stackoverflow.com/a/25741304/4940707).. Правильный способ использования Kinesis - это перемещение контрольной точки в контрольную точку. Если потребитель должен был сохранить SequenceNumber
из прочитайте последнюю запись и перезапустите как таковой:
ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber
Тогда вы увидите только новые записи.