Я пытаюсь прочитать некоторые документы из базы данных mon go и проанализировать схему в искровом DataFrame. До сих пор я успешно читал из mon go и преобразовывал полученный mongoRDD в DataFrame, используя схему, определенную классами case, но есть сценарий, в котором коллекция mon go имеет поле, содержащее несколько типов данных (массив строк против массив вложенных объектов). До сих пор я просто анализировал поле в виде строки, а затем использовал spark sql from_ json () для анализа вложенных объектов в новой схеме, но я обнаружил, что когда поле не соответствует схема, она возвращает ноль для всех полей в схеме, а не просто поле, которое не соответствует. Есть ли способ разобрать это так, чтобы только поля, не соответствующие схеме, возвращали ноль?
//creating mongo test data in mongo shell
db.createCollection("testColl")
db.testColl.insertMany([
{ "foo" : ["fooString1", "fooString2"], "bar" : "barString"},
{ "foo" : [{"uid" : "fooString1"}, {"uid" : "fooString2"}], "bar" : "barString"}
])
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.types.{StringType, StructField, StructType}
//mongo connector and read config
val testConfig = ReadConfig(Map("uri" -> "mongodb://some.mongo.db",
"database" -> "testDB",
"collection" -> "testColl"
))
//Option 1: 'lowest common denominator' case class - works, but leaves the nested struct type value as json that then needs additional parsing
case class stringArray (foo: Option[Seq[String]], bar: Option[String])
val df1 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[stringArray]
df1.show()
+--------------------+---------+
| foo| bar|
+--------------------+---------+
|[fooString1, fooS...|barString|
|[{ "uid" : "fooSt...|barString|
+--------------------+---------+
//Option 2: accurate case class - fails with:
//com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(uid,StringType,true)) (value: BsonString{value='fooString1'})
case class fooDoc (uid: Option[String])
case class docArray (foo: Option[Seq[fooDoc]], bar: Option[String])
val df2 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[docArray]
//Option 3: map all rows to json string, then use from_json - why does return null for 'bar' in the case of the schema that doesn't fit?
val mrdd = MongoSpark.load(spark.sparkContext, testConfig)
val jsonRDD = mrdd.map(x => Row(x.toJson()))
val simpleSchema = StructType(Seq(StructField("wholeRecordJson", StringType, true)))
val schema = ScalaReflection.schemaFor[docArray].dataType.asInstanceOf[StructType]
val jsonDF = spark.createDataFrame(jsonRDD, simpleSchema)
val df3 = jsonDF.withColumn("parsed",from_json($"wholeRecordJson", schema))
df3.select("parsed.foo", "parsed.bar").show()
+--------------------+---------+
| foo| bar|
+--------------------+---------+
| null| null|
|[[fooString1], [f...|barString|
+--------------------+---------+
//Desired results:
//desired outcome is for only the field not matching the schema (string type of 'foo') is null, but matching columns are populated
+--------------------+---------+
| foo| bar|
+--------------------+---------+
| null|barString|
|[[fooString1], [f...|barString|
+--------------------+---------+