конфликт версий, текущая версия [2] отличается от предоставленной [1] - PullRequest
0 голосов
/ 31 января 2019

У меня есть тема Kafka и приложение Spark.Приложение Spark получает данные из раздела Kafka, предварительно агрегирует их и сохраняет в Elastic Search.Звучит просто, верно?

Все работает нормально, как и ожидалось, но в ту минуту, когда я установил для свойства spark.cores что-то отличное от 1, я начал получать

version conflict, current version [2] is different than the one provided [1]

После небольшого исследования,Я думаю, что ошибка заключается в том, что несколько ядер могут иметь один и тот же документ одновременно, и поэтому, когда одно ядро ​​завершает агрегацию со своей стороны и пытается выполнить обратную запись в документ, он получает эту ошибку

TBH,Я немного удивлен таким поведением, потому что я думал, что Spark и ES справятся с этим самостоятельно.Это заставляет меня поверить, что, возможно, что-то не так с моим подходом.

Как я могу это исправить?Есть ли какая-то концепция «синхронизированного» или «блокирующего», которой я должен следовать?

Приветствия!

Ответы [ 2 ]

0 голосов
/ 04 февраля 2019

Я хотел бы ответить на свой вопрос.В моем случае я обновлял счетчик документов.Итак, все, что мне нужно было сделать, это повторить попытку при возникновении конфликта, потому что мне просто нужно было объединить счетчик.

Мой вариант использования был несколько this :

Для многих случаев частичного обновления не имеет значения, что документ был изменен.Например, если два процесса увеличивают счетчик просмотров страниц, не имеет значения, в каком порядке это происходит;если возникает конфликт, единственное, что нам нужно сделать, это повторить попытку обновления.

Это можно сделать автоматически, установив для параметра retry_on_conflict количество попыток обновления, прежде чем произойдет сбой.;по умолчанию он равен 0.

Благодаря Уиллису и этому блогу я смог настроить параметры Elastic Search, и теперь у меня вообще нет проблем

0 голосов
/ 31 января 2019

Похоже, у вас есть несколько сообщений в очереди, которые все обновляют один и тот же документ ES, и эти сообщения обрабатываются одновременно.Существует два возможных решения:

Во-первых, вы можете использовать разделы Kafka для обеспечения последовательной обработки всех сообщений, которые обновляют один и тот же документ ES.Это предполагает, что в вашем сообщении есть какое-то свойство, которое Kafka может использовать для определения того, как сообщения отображаются в документы ES.

Другой способ - это стандартный способ обработки конфликтов оптимистичного параллелизма: повторите транзакцию.Если у вас есть данные из сообщения Kafka, которые нужно добавить в документ ES, а текущий документ в ES - версия 1, вы можете попытаться обновить его и сохранить обратно версию 2. Но если кто-то уже написал версию 2вы можете повторить попытку, используя версию 2 в качестве отправной точки, добавив новые данные и сохранив версию 3.

Если любой из этих подходов разрушает параллелизм, который вы ожидали получить от Kafka и Spark, то вы можетенужно переосмыслить свой подход.Возможно, вам придется ввести новый этап обработки, который выполняет некоторые тяжелые работы, но на самом деле не пишет в ES, а затем выполните обновления ES в отдельном шаге.

...