Привет, я новичок в Java Spark и уже несколько дней ищу решения.
Я работаю над загрузкой данных MongoDB в таблицу кустов, однако при сохранении AsAsable обнаружил ошибку возникает эта ошибка
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(oid,StringType,true)) (value: BsonString{value='54d3e8aeda556106feba7fa2'})
Я пытался увеличить sampleSize, разные версии go -spark-разъема mon, но не работающие решения.
Не могу Выясните, что является причиной root и какие промежутки между ними необходимо выполнить?
Самым запутанным является то, что у меня есть похожие наборы данных, использующие один и тот же поток без проблем.
схема данных mongodb похожа на вложенную структуру и массив
root
|-- sample: struct (nullable = true)
| |-- parent: struct (nullable = true)
| | |-- expanded: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- distance: integer (nullable = true)
| | | | |-- id: struct (nullable = true)
| | | | | |-- oid: string (nullable = true)
| | | | |-- keys: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- parent_id: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- oid: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- id: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- oid: string (nullable = true)
пример данных
"sample": {
"expanded": [
{
"distance": 0,
"type": "domain",
"id": "54d3e17b5cf737074d4065b0",
"parent_id": [
"54d3e1775cf737074d406599"
],
"name": "level2"
},
{
"distance": 1,
"type": "domain",
"id": "54d3e1775cf737074d406599",
"name": "level1"
}
],
"id": [
"54d3e17b5cf737074d4065b0"
]
}
пример кода
public static void main(final String[] args) throws InterruptedException {
// spark session read mongodb
SparkSession mongo_spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("mongo_spark.master", "local")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
.enableHiveSupport()
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());
// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();
// createOrReplaceTempView to see if the data being read
// implicitDS.createOrReplaceTempView("my_table");
// implicitDS.printSchema();
// implicitDS.show();
// saveAsTable
implicitDS.write().saveAsTable("my_table");
mongo_spark.sql("SELECT * FROM my_table limit 1").show();
mongo_spark.stop();
}
Если у кого-то возникнут какие-то мысли, я бы быть очень ценным. Спасибо