Использование apache spark 1.6.0
У меня есть два db: phoenix и postgres с одинаковым количеством записей, и у меня проблемы с производительностью при загрузке данных из таблиц postgres.
Для создания фрейма данных из таблиц Феникс я использую метод org.apache.phoenix.spark.SparkSqlContextFunctions phoenixTableAsDataFrame:
implicit val sc: SparkContext = new SparkContext(new SparkConf().setAppName("App"))
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "jdbc:phoenix:node01,node02,node03:2181")
implicit val sqlc = new SQLContext(sc)
def findAllPH: DataFrame = {
val predicate: Option[String] = Some(s" operationTime > TO_TIMESTAMP( '30/01/2017 00:00:00' ,'dd/MM/yyyy HH:mm:ss') ");
val df = sqlc.phoenixTableAsDataFrame(
TableName, TableColumns, predicate, conf = configuration
).select("custumer_id")
df
}
Для создания фрейма данных из postgres:
def findAllPS(): DataFrame = {
val df = sqlc.read
.format("jdbc")
.options(
Map("url" -> jdbc:postgresql://server_db:5432/db_name,
"driver" -> "org.postgresql.Driver",
"dbtable" -> s"(select id,name,surname from custumer_table ) t ",
"user" -> "user",
"password" -> "password").load().select("id","name","surname")
df
}
Когда я звоню findAllPH.where("custumer_id = '100000'").show(1)
занимает несколько секунд (благодаря DagScheduler )
Когда я звоню findAllPS.where("id = '100000'").show(1)
, требуется несколько минут, потому что spark загружает все записи перед фильтрацией по идентификатору (кажется, нет DagScheduler)
Так что, если я выполняю sql-соединение с фреймом данных: findAllPH join findAllPS on id = and custumer_id
, это занимает много времени, однако самообладание между findAllPH на custumer_id занимает немного времени
Есть ли способ заставить PS работать как PHOENIX?
Первым решением проблемы было получение всех идентификаторов:
ids = findAllPH.rdd.collect.as[String]
запрос postgres, например:
"dbtable" -> (select id,name,surname from custumer_table where id in (ids(1), ids(2), .... ids(N) ))
Это быстрее, но не так, как ожидалось, потому что функциясобирать очень дорого