Эффективный алгоритм обновления указателя на самый ранний ожидающий журнал работ - PullRequest
1 голос
/ 22 апреля 2020

Есть таблица DynamoDB, которая содержит записи. В этой таблице есть 1 строка, предназначенная только для отслеживания сообщения nextPending. Поле ключа раздела имеет значение nextPending и значение messageId. Все остальные строки таблицы содержат сообщения. Каждая запись имеет messageId, которая уникальна, не имеет пробелов и не уменьшается. На данный момент в этой таблице более 1 млн записей. Записи указывают на произведение. Служба использует эту очередь и обрабатывает каждое сообщение одно за другим. После полной обработки сообщения выполняется две вещи. Во-первых, он устанавливает поле состояния записи в состояние терминала. Затем он обновляет запись nextPending с messageId = предыдущая messageId + 1.

Теперь мы пытаемся сделать этот сервис многопоточным. Вместо обработки сообщений по одному, у нас будет несколько потоков, которые работают с сообщениями параллельно, и эти сообщения могут быть выполнены в произвольном порядке.

Я ищу эффективный и элегантный алгоритм для правильного обновления поля nextPending. Представьте, что nextPending в настоящее время имеет значение 101. Различные потоки в службе работают с сообщениями между 101 и 110. Скажем, они завершаются в следующем порядке: 109, 105, 104, 108, 103, 102, 114, 101, .... Нам нужно обновить nextPending до 105 после того, как мы увидим, что 101 закончен. До тех пор, пока 101 не будет завершено, мы не сможем обновить nextPending, потому что 101 может завершиться ошибкой и потребуется повторная попытка, и, пока это не сделано, nextPending всегда должен указывать на самое раннее ожидающее сообщение.

Один алгоритм Возможно:

После полной обработки сообщения каждый поток выполняет две вещи. Во-первых, он устанавливает поле состояния записи в состояние терминала. Затем он обновляет запись nextPending самой ранней messageId, ожидающей на данный момент. Но это решение требует, чтобы каждый поток считывал несколько записей из DynamoDB и проверял состояние сообщения. Кроме того, несколько потоков теперь будут конкурировать за условное обновление этой строки в таблице. Это также не что-то идеальное.

Может быть другой алгоритм: каждый поток разделяет общее скользящее окно, которое отслеживает все завершенные сообщения. Когда сообщение завершено, выделенный поток проверяет, является ли оно messageId = nextPending + 1. Если да, мы обновляем nextPending, чтобы быть самым большим числом в текущей последовательности завершенных сообщений. В этом методе мы не излишне читаем данные из DynamoDB, у нас также нет нескольких потоков, конкурирующих друг с другом для выполнения одной и той же единицы работы.

Есть идеи получше?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...