У меня есть тема Kafka и приложение Spark.Приложение Spark получает данные из раздела Kafka, предварительно агрегирует их и сохраняет в Elastic Search.Звучит просто, верно?
Все работает нормально, как и ожидалось, но в ту минуту, когда я установил для свойства spark.cores что-то отличное от 1, я начал получать
version conflict, current version [2] is different than the one provided [1]
После небольшого исследования,Я думаю, что ошибка заключается в том, что несколько ядер могут иметь один и тот же документ одновременно, и поэтому, когда одно ядро завершает агрегацию со своей стороны и пытается выполнить обратную запись в документ, он получает эту ошибку
TBH,Я немного удивлен таким поведением, потому что я думал, что Spark и ES справятся с этим самостоятельно.Это заставляет меня поверить, что, возможно, что-то не так с моим подходом.
Как я могу это исправить?Есть ли какая-то концепция «синхронизированного» или «блокирующего», которой я должен следовать?
Приветствия!