Загрузить Spark Dataframe из таблицы Posgres - PullRequest
0 голосов
/ 08 февраля 2019

Использование apache spark 1.6.0

У меня есть два db: phoenix и postgres с одинаковым количеством записей, и у меня проблемы с производительностью при загрузке данных из таблиц postgres.

Для создания фрейма данных из таблиц Феникс я использую метод org.apache.phoenix.spark.SparkSqlContextFunctions phoenixTableAsDataFrame:

implicit val sc: SparkContext = new SparkContext(new SparkConf().setAppName("App"))
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "jdbc:phoenix:node01,node02,node03:2181")

implicit val sqlc = new SQLContext(sc)
def findAllPH: DataFrame = {
    val predicate: Option[String] = Some(s" operationTime > TO_TIMESTAMP( '30/01/2017 00:00:00' ,'dd/MM/yyyy HH:mm:ss') ");
    val df = sqlc.phoenixTableAsDataFrame(
      TableName, TableColumns, predicate, conf = configuration
    ).select("custumer_id")
    df
}

Для создания фрейма данных из postgres:

    def findAllPS(): DataFrame = {
      val df = sqlc.read
        .format("jdbc")
        .options(
          Map("url" -> jdbc:postgresql://server_db:5432/db_name,
            "driver" -> "org.postgresql.Driver",
            "dbtable" -> s"(select id,name,surname from custumer_table ) t ",
            "user" -> "user",
            "password" -> "password").load().select("id","name","surname")
    df
  }

Когда я звоню findAllPH.where("custumer_id = '100000'").show(1) занимает несколько секунд (благодаря DagScheduler )

Когда я звоню findAllPS.where("id = '100000'").show(1), требуется несколько минут, потому что spark загружает все записи перед фильтрацией по идентификатору (кажется, нет DagScheduler)

Так что, если я выполняю sql-соединение с фреймом данных: findAllPH join findAllPS on id = and custumer_id, это занимает много времени, однако самообладание между findAllPH на custumer_id занимает немного времени

Есть ли способ заставить PS работать как PHOENIX?

Первым решением проблемы было получение всех идентификаторов:

ids = findAllPH.rdd.collect.as[String]

запрос postgres, например:

"dbtable" -> (select id,name,surname from custumer_table where id in (ids(1), ids(2), .... ids(N) )) 

Это быстрее, но не так, как ожидалось, потому что функциясобирать очень дорого

...