Как лучше всего обрабатывать конфликты схем преобразования MongoRDD в DataFrame? - PullRequest
1 голос
/ 04 марта 2020

Я пытаюсь прочитать некоторые документы из базы данных mon go и проанализировать схему в искровом DataFrame. До сих пор я успешно читал из mon go и преобразовывал полученный mongoRDD в DataFrame, используя схему, определенную классами case, но есть сценарий, в котором коллекция mon go имеет поле, содержащее несколько типов данных (массив строк против массив вложенных объектов). До сих пор я просто анализировал поле в виде строки, а затем использовал spark sql from_ json () для анализа вложенных объектов в новой схеме, но я обнаружил, что когда поле не соответствует схема, она возвращает ноль для всех полей в схеме, а не просто поле, которое не соответствует. Есть ли способ разобрать это так, чтобы только поля, не соответствующие схеме, возвращали ноль?

//creating mongo test data in mongo shell
db.createCollection("testColl")
db.testColl.insertMany([
    { "foo" : ["fooString1", "fooString2"], "bar" : "barString"},
    { "foo" : [{"uid" : "fooString1"}, {"uid" : "fooString2"}], "bar" : "barString"}
])


import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.types.{StringType, StructField, StructType}

//mongo connector and read config
val testConfig = ReadConfig(Map("uri" -> "mongodb://some.mongo.db",
    "database" -> "testDB",
    "collection" -> "testColl"
  ))



//Option 1: 'lowest common denominator' case class - works, but leaves the nested struct type value as json that then needs additional parsing

case class stringArray (foo: Option[Seq[String]], bar: Option[String])
val df1 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[stringArray]
df1.show()
+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|[fooString1, fooS...|barString|
|[{ "uid" : "fooSt...|barString|
+--------------------+---------+


//Option 2: accurate case class - fails with:
//com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(uid,StringType,true)) (value: BsonString{value='fooString1'})

case class fooDoc (uid: Option[String])
case class docArray (foo: Option[Seq[fooDoc]], bar: Option[String])
val df2 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[docArray]


//Option 3: map all rows to json string, then use from_json - why does return null for 'bar' in the case of the schema that doesn't fit?

val mrdd = MongoSpark.load(spark.sparkContext, testConfig)
val jsonRDD = mrdd.map(x => Row(x.toJson()))
val simpleSchema = StructType(Seq(StructField("wholeRecordJson", StringType, true)))
val schema = ScalaReflection.schemaFor[docArray].dataType.asInstanceOf[StructType]
val jsonDF = spark.createDataFrame(jsonRDD, simpleSchema)
val df3 = jsonDF.withColumn("parsed",from_json($"wholeRecordJson", schema))
df3.select("parsed.foo", "parsed.bar").show()
+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|                null|     null|
|[[fooString1], [f...|barString|
+--------------------+---------+


//Desired results:
//desired outcome is for only the field not matching the schema (string type of 'foo') is null, but matching columns are populated

+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|                null|barString|
|[[fooString1], [f...|barString|
+--------------------+---------+

1 Ответ

1 голос
/ 07 марта 2020

Нет, не существует простого способа сделать это, поскольку несовместимая схема слияния в одной и той же коллекции документов является анти-шаблоном, даже в Mon go.

Существует три основных подхода к решению этой проблемы:

  1. Исправьте данные в MongoDB.

  2. Выполните запрос, который «нормализует» схему Mon go, например, отбрасывает поля с несовместимыми типами или преобразует их или переименовывает их и т. д. c.

  3. Выполняет отдельные запросы к Mon go для документы определенного типа схемы. (В понедельник go есть операторы запросов, которые могут фильтровать по типу поля.) Затем постобработка в Spark и, наконец, объединение данных в один набор данных Spark.

...