Есть таблица DynamoDB, которая содержит записи. В этой таблице есть 1 строка, предназначенная только для отслеживания сообщения nextPending
. Поле ключа раздела имеет значение nextPending
и значение messageId
. Все остальные строки таблицы содержат сообщения. Каждая запись имеет messageId
, которая уникальна, не имеет пробелов и не уменьшается. На данный момент в этой таблице более 1 млн записей. Записи указывают на произведение. Служба использует эту очередь и обрабатывает каждое сообщение одно за другим. После полной обработки сообщения выполняется две вещи. Во-первых, он устанавливает поле состояния записи в состояние терминала. Затем он обновляет запись nextPending
с messageId
= предыдущая messageId + 1
.
Теперь мы пытаемся сделать этот сервис многопоточным. Вместо обработки сообщений по одному, у нас будет несколько потоков, которые работают с сообщениями параллельно, и эти сообщения могут быть выполнены в произвольном порядке.
Я ищу эффективный и элегантный алгоритм для правильного обновления поля nextPending
. Представьте, что nextPending в настоящее время имеет значение 101. Различные потоки в службе работают с сообщениями между 101 и 110. Скажем, они завершаются в следующем порядке: 109, 105, 104, 108, 103, 102, 114, 101, .... Нам нужно обновить nextPending до 105 после того, как мы увидим, что 101 закончен. До тех пор, пока 101 не будет завершено, мы не сможем обновить nextPending
, потому что 101 может завершиться ошибкой и потребуется повторная попытка, и, пока это не сделано, nextPending
всегда должен указывать на самое раннее ожидающее сообщение.
Один алгоритм Возможно:
После полной обработки сообщения каждый поток выполняет две вещи. Во-первых, он устанавливает поле состояния записи в состояние терминала. Затем он обновляет запись nextPending
самой ранней messageId
, ожидающей на данный момент. Но это решение требует, чтобы каждый поток считывал несколько записей из DynamoDB и проверял состояние сообщения. Кроме того, несколько потоков теперь будут конкурировать за условное обновление этой строки в таблице. Это также не что-то идеальное.
Может быть другой алгоритм: каждый поток разделяет общее скользящее окно, которое отслеживает все завершенные сообщения. Когда сообщение завершено, выделенный поток проверяет, является ли оно messageId
= nextPending + 1
. Если да, мы обновляем nextPending
, чтобы быть самым большим числом в текущей последовательности завершенных сообщений. В этом методе мы не излишне читаем данные из DynamoDB, у нас также нет нескольких потоков, конкурирующих друг с другом для выполнения одной и той же единицы работы.
Есть идеи получше?