Постараюсь сделать это как можно более понятным, чтобы пример не требовался, поскольку это должна быть концепция, которую я не правильно нарисовал asp, и я борюсь с проблемой, а не с данными или Spark сам код.
Мне необходимо вставить данные города в их собственную базу данных ( MongoDB ), и я пытаюсь выполнить эти операции как можно быстрее.
Примите во внимание пример DataFrame со следующим, где я хочу сделать upserts против MongoDB на основе, например, year , city и зона .
year - city - zone - num_business - num_vehicles
.
Имея groupedBy эти столбцы, которые я только что ожидал, чтобы выполнить переход в базу данных.
Использование драйвера MongoDB Мне необходимо создать несколько экземпляров WriteConfigs для работы с несколькими базами данных ( 1 база данных на город ).
// the 'getDatabaseWriteConfigsPerCity' method filters the 'df' so it only contains the docs from a single city.
for (cityDBConnection <- getDatabaseWriteConfigsPerCity(df) {
cityDBConnection.getDf.foreach(
... // set MongoDB upsert criteria.
)
}
Работать таким образом можно но при использовании foreachPartition
можно добиться большей производительности, так как я надеюсь, что эти записи DS в DF распространяются на исполнителей, если одновременно используется больше данных.
Однако при использовании foreachPartition
я получаю ошибочные результаты. Erroneus, потому что они кажутся неполными. Счетчики не работают и все такое.
Я подозреваю, что это потому, что среди разделов одни и те же ключи находятся в разных разделах, и только когда они объединяются в мастер, когда они вставляются в MongoDB как одна запись.
Можно ли каким-то образом убедиться, что разделы содержат всего документов, связанных с ключом вставки?
Не знаете, достаточно ли я ясен? , но если это все еще слишком сложно, я обновлю как можно скорее.