То, что вы пытаетесь сделать, кажется невозможным в текущей версии Spark.Выполненный запрос построен следующим образом:
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
(см. Org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD # compute)
options.table
соответствует (SELECT...) оператор из атрибута table
.
Не могли бы вы объяснить, почему вам нужно загрузить столбцы из подзапроса?Если вы выполняете объединения или другие операции SQL внутри этого подзапроса, вы всегда можете «обойти» это и использовать Spark SQL для этого (объединения, операции SQL и т. Д.).
edit:
Как вы объяснили, причина использования вашего подзапроса - извлечение JSONB.Очевидно, что он будет работать лучше, чем собственная операция SQL, но если вы хотите использовать Spark для распараллеливания обработки, IMO вам необходимо объявить обработку JSON на уровне Spark, как показано здесь:
CREATE TABLE jsonb_test (
content jsonb
);
INSERT INTO jsonb_test (content) VALUES
('{"first_name": "X", "last_name": "Y"}');
Икод:
val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
"dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
"driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
.format("jdbc")
.options(opts)
.load()
.withColumn("person", functions.from_json($"content", schema))
val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"
Spark поддерживает поля JSONB на всем протяжении PostgresDialect
, что при преобразовании типов БД в типы Catalyst JSONB рассматривается как StringType
:
private def toCatalystType(
typeName: String,
precision: Int,
scale: Int): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
Some(StringType)