Выполнить Mysql-запрос в foreachPartition на медленной скорости запуска - PullRequest
0 голосов
/ 09 июня 2018

Я хочу выполнить запрос mysql внутри foreachparition в spark и в конечном итоге получить все результаты запроса во фрейме данных.Это выглядит так:

var rowAccumulator: RowAccumulator = new RowAccumulator

foreachPartition((p) => {
  val result = MysqlService.getData(query, p)
  rowAccumulator.add(result)
})

, затем преобразуйте rowAccumulator во фрейм данных.

Однако со временем он работает медленно.Первый запрос занимает 130 мс, 20-й - 150000 мс, например.Я замечаю, что в MysqlService я каждый раз создаю сессию БД, и это может быть не правильно.Есть ли лучший способ сделать это?

Обновление: MysqlService используется в разных местах, и мы хотим сделать код легким в обслуживании.Если это не будет работать должным образом, мы могли бы применить другой способ выполнения запроса, например, используя spark jdbc.Мне любопытно, по какой причине этот запрос выполняется медленно.

1 Ответ

0 голосов
/ 09 июня 2018

Аккумуляторы Spark не предназначены для обработки больших объемов данных.Он предназначен в первую очередь для сбора вспомогательной статистики с использованием методов, работающих в постоянной памяти (например, счетчиков).

Использование такого аккумулятора - менее эффективный вариант collect (не рекомендуется collect) и недаже отдаленно, если вы

конвертируете rowAccumulator в фрейм данных.

Поскольку вы используете базу данных MySQL, вы должны сначала взглянуть на Spark JDBC-коннектор :

spark.read.jdbc(...)

и только если у вас есть особые требования, используйте пользовательский код.Если вы обрабатываете преобразования напрямую с помощью map

rdd.foreachPartition((p) => {
  MysqlService.getData(query, p)
}).map(x => anyRequiredTransformation(x)).toDF
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...