Фон с примером:
Мы создаем приложение для потоковой обработки, которое получает поток данных, запускает алгоритмы для данных и сохраняет результаты в базе данных. В качестве примера по этому вопросу воспользуемся потоком покупок. У каждой покупки есть географическое местоположение покупки (местоположение магазина или местоположение на основе IP-адреса).
Поток покупок исходит из темы Кафки.
Теперь мы хотим обработать этот потокданные и выводить некоторую статистику. Например, мы хотим увидеть среднюю цену покупки для каждого квадрата 100x100 метров в мире.
Мы хотим иметь возможность динамического масштабирования, чтобы справляться с пиками, а также не использовать ресурсы, которые нам не нужны.
Из всех компонентов нашего решения рассмотрим одну небольшую часть, часть, которая должна обновить базу данных. Допустим, мы храним окончательную статистику в какой-то базе данных, она содержит среднюю цену за каждые квадратные метры 100x100, а также счет и сумму (для пересчета на новые данные).
Учитывая недавно приобретенный предмет,мы находим соответствующий квадрат 100х100, загружаем данные из базы данных, обновляем значения в памяти и затем записываем обновленные значения в базу данных. (На самом деле нам нужно запустить немного более сложный алгоритм, но для примера давайте оставим его простым).
Это звучит просто, но когда мы начинаем думать о масштабировании, это становится более сложным.
Задача:
Чтобы поддерживать высокий масштаб, мы хотим иметь несколько экземпляров службы обработки, которая обрабатывает покупки и обновляет соответствующие квадраты 100x100 в базе данных.
Проблема возникает, когда 2 экземпляра пытаются обработать 2 разных покупки, попадающих в один и тот же квадрат 100x100. В этом случае может возникнуть условие гонки, в результате которого только одна покупка повлияет на конечный результат.
Допустим, события происходят в следующем порядке:
- 1-еэкземпляр принимает 1-ю покупку и загружает данные из базы данных (data-A)
- 2-й экземпляр принимает 2-ю покупку и загружает данные из базы данных (data-A)
- обновления 1-го экземпляраdata-A в data-B и записывает его в базу данных
- 2-й экземпляр обновляет data-A в data-C и записывает его в базу данных
В результате data-C рассчитывается по данным A и покупки 2 и при этом отсутствует информация о покупке 1.
Наши мысли
Мы решили решить эту проблему с помощью тематических разделов Kafka. Кафка назначает разделы темы для потребителей, и каждое сообщение попадает в один раздел темы. Мы также можем контролировать разделение, к которому он идет, скажем, по идентификатору квадрата 100x100. Таким образом, все покупки из данного квадрата попадают в один и тот же раздел, в результате чего он поступает к одному и тому же потребителю. Таким образом, никогда не будет 2 экземпляра нашего сервиса, которые хотят обновить один и тот же квадрат.
Это работает нормально, пока мы не захотим применить динамическое масштабирование. Когда потребители Kafka добавляются или удаляются, Kafka переназначает разделы потребителям, что может в некоторых случаях приводить к перемещению раздела от одного потребителя к другому. Это может привести к тому, что 2 экземпляра будут обрабатывать покупки из одного и того же квадрата в течение ограниченного периода времени.
Чтобы преодолеть это, мы подумали об использовании распределенных блокировок, которые могут быть реализованы через MongoDB , Redis и, возможно, другие базы данных. В сочетании с разделением разделов Kafka большую часть времени блокировки не будут блокировать какую-либо обработку, и только в случае динамического масштабирования некоторые рабочие будут блокироваться на небольшой период времени.
У нас есть 2 основныхпроблемы с подходом, который мы нашли. Это сложно и добавляет задержку времени блокировки, когда в большинстве случаев она нам не нужна.
Вопросы:
- Есть ли лучший подход для решениянаша проблема?
- Будет ли наш подход действительно работать? Мы что-то упускаем из-за того, как работает Кафка?
Редактировать 30.10.2019 12: 00:
Как избежать блокировок:
Чтобы избежать блокировок, о которых мы думалииспользуя условие обновления с MongoDB. Тем самым мы пытаемся реализовать оптимистическую стратегию блокировки.
У нас будет уникальное индексирование свойства квадратного идентификатора, и мы добавим поле версии в каждый документ. Есть 2 варианта:
Документ еще не существует. Когда мы пытаемся прочитать существующие данные, мы не получаем никакого результата, поэтому мы знаем, что документ не существует. Теперь мы создаем первое значение и пытаемся вставить его. Если мы преуспеваем, это означает, что мы первые, кто это делает, если мы терпим неудачу (ошибка уникального ограничения для квадратного идентификатора), это означает, что кто-то еще вставил значение в первый раз. Затем мы перейдем к варианту № 2 ниже.
Документ существует. Когда мы читаем существующие данные, мы сохраняем старую версию. Когда мы хотим обновить документ в БД, мы отправляем новое значение и увеличиваем номер версии на 1. Мы также указываем условие обновления, которое будет идентификатором документа, и версия будет равна старой версии. Когда мы ожидаем результата обновления, мы можем видеть, сколько документов было обновлено. Если обновлений нет, это означает, что версия была изменена, в этот момент мы повторяем весь процесс снова. Если есть 1 обновление, это означает, что мы преуспели.
(Это сообщение в блоге объясняет это более подробно)
Это будет работать толькоесли мы выполним операции чтения и записи с заданным значением большинства для чтения / записи (в противном случае мы потеряем данные).
Теперь возникает вопрос о производительности, которая будет работать быстрее распределенной блокировки или большинстваЧтение / запись обновлений концерна?