В Интернете есть несколько похожих вопросов, но никто не имеет ответов.
Я использую следующий код для сохранения данных монго в Hive, но возникают исключения, как показано в конце.Я хотел бы спросить, как обойти эту проблему
Я использую
spark-mongo-разъем (spark 2.1.0 - scala 2.11)
java-mongo-driver 3.10.2
import com.mongodb.spark.MongoSpark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
object MongoConnector_Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().set("spark.mongodb.input.uri", "mongodb://user:pass@mongo1:123456/db1.t1").setMaster("local[4]").setAppName("MongoConnectorTest")
val session = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
val schema: StructType = new StructType().add("_id", "string").add("x", "string").add("y", "string").add("z", "string")//
val df = MongoSpark.read(session).schema(schema).load()
df.write.saveAsTable("MongoConnector_Test" + System.currentTimeMillis())
}
}
Но возникает следующее исключение.
Caused by: org.bson.BsonInvalidOperationException: Invalid state INITIAL
at org.bson.json.StrictCharacterStreamJsonWriter.checkState(StrictCharacterStreamJsonWriter.java:395)
at org.bson.json.StrictCharacterStreamJsonWriter.writeNull(StrictCharacterStreamJsonWriter.java:192)
at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:24)
at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:21)
at org.bson.json.JsonWriter.doWriteNull(JsonWriter.java:206)
at org.bson.AbstractBsonWriter.writeNull(AbstractBsonWriter.java:557)
at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:38)
at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:28)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonValueCodec.encode(BsonValueCodec.java:62)
at com.mongodb.spark.sql.BsonValueToJson$.apply(BsonValueToJson.scala:29)
at com.mongodb.spark.sql.MapFunctions$.bsonValueToString(MapFunctions.scala:103)
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:78)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:37)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more