У нас есть следующий случай:
- Первые 10 осколков в потоке, читатель запущен.
Мы изменили масштаб до 20 осколков через консоль, наше приложение не имеет
соответствующая логика для обработки завершения осколка. Приложению не удалось получить данные от новых шардов
Изменение масштаба до 10 осколков, безрезультатно, перезапуск потребителя не помог
через 3-4 часа мы внедрили исправление, в котором был корректный код завершения работы процессора:
@Override
public void shutdown(ShutdownInput shutdownInput) {
if(ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
log.error("Checkpoint failed", e);
}
}
}
В результате мы увидели данные, передаваемые через процессор. в вспомогательной таблице Kinesis значение для поля checkpoint
было обновлено до SHARD_END
. (но checkpointSubSequenceNumber=0
как и раньше).
Примерно через 24 часа мы увидели, что данные одного дня снова начали течь через наш процессор (я уверен). GetRecords.IteratorAgeMilliseconds пошел до 80M +. Отправленные данные уже были обработаны день назад (проверено в журналах и т. Д.).
Правильно ли мы отключили осколок? Получим ли мы эти фальшивые данные через 24 часа? Есть ли объяснение этому поведению?
И да, в таблице для аренды теперь есть 30 записей: 20 для шардов, 10-29 с контрольной точкой = SHARD_END и 10 для более поздних шардов. Я удалил ранее существующие 10 записей для 0-10 шардов, пытаясь перезапустить обработку. Я обеспокоен тем, что lease_counter растет для мертвых осколков.
Писатель не останавливался все это время.