Разбиение в Spark запроса из PostgreSQL (JDBC) - PullRequest
0 голосов
/ 25 мая 2018

Я использую spark (с pyspark) в кластерном режиме и читаю данные из RDBMS через JDBC.Я читаю информацию по запросу (не напрямую по таблице)

Я использую параметры для разбиения, такие как numPartitions, upperBound и т. Д. ...

sql = (select ... )

и

df=spark
.read
.jdbc(url=jdbcUrl, table=sql, 
properties=connectionProperties, column="brand_id", lowerBound=1, 
upperBound=12000,numPartitions=10000 )

К сожалению, параметры Spark для преобразования разделов в предложении WHERE в конце сгенерированного запроса, поэтому PostGreSQL полностью прочитал таблицу без индекса использования!

У меня есть один такой запрос

SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM 
  ( select  ... ) 
tab WHERE brand_id >= 5 AND brand_id < 6  

1 Ответ

0 голосов
/ 25 мая 2018

То, что вы пытаетесь сделать, кажется невозможным в текущей версии Spark.Выполненный запрос построен следующим образом:

   val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
    stmt = conn.prepareStatement(sqlText,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

(см. Org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD # compute)

options.table соответствует (SELECT...) оператор из атрибута table.

Не могли бы вы объяснить, почему вам нужно загрузить столбцы из подзапроса?Если вы выполняете объединения или другие операции SQL внутри этого подзапроса, вы всегда можете «обойти» это и использовать Spark SQL для этого (объединения, операции SQL и т. Д.).


edit:

Как вы объяснили, причина использования вашего подзапроса - извлечение JSONB.Очевидно, что он будет работать лучше, чем собственная операция SQL, но если вы хотите использовать Spark для распараллеливания обработки, IMO вам необходимо объявить обработку JSON на уровне Spark, как показано здесь:

CREATE TABLE jsonb_test (
  content jsonb
);

INSERT INTO jsonb_test (content) VALUES 
('{"first_name": "X", "last_name": "Y"}');

Икод:

val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
  "dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
  "driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
  StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
  .format("jdbc")
  .options(opts)
  .load()
  .withColumn("person", functions.from_json($"content", schema))

val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)

extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"

Spark поддерживает поля JSONB на всем протяжении PostgresDialect, что при преобразовании типов БД в типы Catalyst JSONB рассматривается как StringType:

  private def toCatalystType(
  typeName: String,
  precision: Int,
  scale: Int): Option[DataType] = typeName match {
      case "bool" => Some(BooleanType)
      case "bit" => Some(BinaryType)
      case "int2" => Some(ShortType)
      case "int4" => Some(IntegerType)
      case "int8" | "oid" => Some(LongType)
      case "float4" => Some(FloatType)
      case "money" | "float8" => Some(DoubleType)
      case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
        Some(StringType)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...