Чтобы решить тип данных конфликта, я приводил ноль к строке, как загрузить существующие данные с помощью пользовательской схемы? Искра Java - PullRequest
0 голосов
/ 05 апреля 2020

Я столкнулся с конфликтом типов данных при загрузке mongodb в таблицу кустов, поэтому я изменил схему следующим образом:

    SparkSession spark = SparkSession.builder()
                .master("local[2]")
                .appName("SparkReadMgToHive")
                .config("spark.sql.warehouse.dir", warehouseLocation)
                .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test.testcollection")
                .config("spark.mongodb.input.sampleSize", 500000)
                .enableHiveSupport()
                .getOrCreate();
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

    df.createOrReplaceTempView("mongotable");
    Dataset<Row> raw = spark.sql("select * from mongotable");

    StructType raw_schema = raw.schema();
    String raw_schema_json = raw_schema.json().replace(basic_string, replace_basic).replace(array_string, replace_array).replace(null_string, replace_null);
    StructType new_schema = (StructType)DataType.fromJson(attribute_raw_schema_json);

Теперь у меня есть схема, которая преобразует null в строки, но следующий вопрос заключается в том, как я загружаю эта схема.

Я нашел много scala кодов вроде

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(customSchema)
    .load("sample.csv")

, но я не могу найти эквивалентный код в Java ... Пока я пытался загрузить данные снова, как это

   Dataset<Row> new_df = spark.read().schema(new_schema).load((Seq<String>) MongoSpark.load(sc).toDF());

, но с ошибкой

    Field '_id' contains conflicting types converting to StringType

моя цель - разрешить конфликт данных, который я могу успешно загрузить в mongodb в таблицу улья. Я немного знаю, что мне нужно привести тип, но не уверен, правильный путь к go. Любой комментарий ценится. Большое спасибо

...