Синхронизировать записи в БД с динамически масштабируемых микросервисов - PullRequest
0 голосов
/ 29 октября 2019

Фон с примером:

Мы создаем приложение для потоковой обработки, которое получает поток данных, запускает алгоритмы для данных и сохраняет результаты в базе данных. В качестве примера по этому вопросу воспользуемся потоком покупок. У каждой покупки есть географическое местоположение покупки (местоположение магазина или местоположение на основе IP-адреса).

Поток покупок исходит из темы Кафки.

Теперь мы хотим обработать этот потокданные и выводить некоторую статистику. Например, мы хотим увидеть среднюю цену покупки для каждого квадрата 100x100 метров в мире.

Мы хотим иметь возможность динамического масштабирования, чтобы справляться с пиками, а также не использовать ресурсы, которые нам не нужны.

Из всех компонентов нашего решения рассмотрим одну небольшую часть, часть, которая должна обновить базу данных. Допустим, мы храним окончательную статистику в какой-то базе данных, она содержит среднюю цену за каждые квадратные метры 100x100, а также счет и сумму (для пересчета на новые данные).

Учитывая недавно приобретенный предмет,мы находим соответствующий квадрат 100х100, загружаем данные из базы данных, обновляем значения в памяти и затем записываем обновленные значения в базу данных. (На самом деле нам нужно запустить немного более сложный алгоритм, но для примера давайте оставим его простым).

Это звучит просто, но когда мы начинаем думать о масштабировании, это становится более сложным.

Задача:

Чтобы поддерживать высокий масштаб, мы хотим иметь несколько экземпляров службы обработки, которая обрабатывает покупки и обновляет соответствующие квадраты 100x100 в базе данных.

Проблема возникает, когда 2 экземпляра пытаются обработать 2 разных покупки, попадающих в один и тот же квадрат 100x100. В этом случае может возникнуть условие гонки, в результате которого только одна покупка повлияет на конечный результат.

Допустим, события происходят в следующем порядке:

  1. 1-еэкземпляр принимает 1-ю покупку и загружает данные из базы данных (data-A)
  2. 2-й экземпляр принимает 2-ю покупку и загружает данные из базы данных (data-A)
  3. обновления 1-го экземпляраdata-A в data-B и записывает его в базу данных
  4. 2-й экземпляр обновляет data-A в data-C и записывает его в базу данных

В результате data-C рассчитывается по данным A и покупки 2 и при этом отсутствует информация о покупке 1.

Наши мысли

Мы решили решить эту проблему с помощью тематических разделов Kafka. Кафка назначает разделы темы для потребителей, и каждое сообщение попадает в один раздел темы. Мы также можем контролировать разделение, к которому он идет, скажем, по идентификатору квадрата 100x100. Таким образом, все покупки из данного квадрата попадают в один и тот же раздел, в результате чего он поступает к одному и тому же потребителю. Таким образом, никогда не будет 2 экземпляра нашего сервиса, которые хотят обновить один и тот же квадрат.

Это работает нормально, пока мы не захотим применить динамическое масштабирование. Когда потребители Kafka добавляются или удаляются, Kafka переназначает разделы потребителям, что может в некоторых случаях приводить к перемещению раздела от одного потребителя к другому. Это может привести к тому, что 2 экземпляра будут обрабатывать покупки из одного и того же квадрата в течение ограниченного периода времени.

Чтобы преодолеть это, мы подумали об использовании распределенных блокировок, которые могут быть реализованы через MongoDB , Redis и, возможно, другие базы данных. В сочетании с разделением разделов Kafka большую часть времени блокировки не будут блокировать какую-либо обработку, и только в случае динамического масштабирования некоторые рабочие будут блокироваться на небольшой период времени.

У нас есть 2 основныхпроблемы с подходом, который мы нашли. Это сложно и добавляет задержку времени блокировки, когда в большинстве случаев она нам не нужна.

Вопросы:

  1. Есть ли лучший подход для решениянаша проблема?
  2. Будет ли наш подход действительно работать? Мы что-то упускаем из-за того, как работает Кафка?

Редактировать 30.10.2019 12: 00:

Как избежать блокировок:

Чтобы избежать блокировок, о которых мы думалииспользуя условие обновления с MongoDB. Тем самым мы пытаемся реализовать оптимистическую стратегию блокировки.

У нас будет уникальное индексирование свойства квадратного идентификатора, и мы добавим поле версии в каждый документ. Есть 2 варианта:

  1. Документ еще не существует. Когда мы пытаемся прочитать существующие данные, мы не получаем никакого результата, поэтому мы знаем, что документ не существует. Теперь мы создаем первое значение и пытаемся вставить его. Если мы преуспеваем, это означает, что мы первые, кто это делает, если мы терпим неудачу (ошибка уникального ограничения для квадратного идентификатора), это означает, что кто-то еще вставил значение в первый раз. Затем мы перейдем к варианту № 2 ниже.

  2. Документ существует. Когда мы читаем существующие данные, мы сохраняем старую версию. Когда мы хотим обновить документ в БД, мы отправляем новое значение и увеличиваем номер версии на 1. Мы также указываем условие обновления, которое будет идентификатором документа, и версия будет равна старой версии. Когда мы ожидаем результата обновления, мы можем видеть, сколько документов было обновлено. Если обновлений нет, это означает, что версия была изменена, в этот момент мы повторяем весь процесс снова. Если есть 1 обновление, это означает, что мы преуспели.

(Это сообщение в блоге объясняет это более подробно)

Это будет работать толькоесли мы выполним операции чтения и записи с заданным значением большинства для чтения / записи (в противном случае мы потеряем данные).

Теперь возникает вопрос о производительности, которая будет работать быстрее распределенной блокировки или большинстваЧтение / запись обновлений концерна?

...