Использование Java Spark Load существующего Mongodb для Hive - PullRequest
0 голосов
/ 10 марта 2020

Цель

Я работаю над ETL Mongodb для Hive с использованием Spark (2.3.1) с Java

Где я RN

Я могу загрузить существующий Mongodb и показать / запросить данные

Проблема

, но у меня возникла проблема с сохранением в таблицу улья.

Структура данных Mongodb

Текущие данные mongodb усложняют вложенный dict (тип структуры), есть ли способ преобразования для более простого сохранения в улье?

public static void main(final String[] args) throws InterruptedException {
    // spark session read mongodb
    SparkSession mongo_spark = SparkSession.builder()
            .master("local")
            .appName("MongoSparkConnectorIntro")
            .config("mongo_spark.master", "local")
            .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
            .config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
            .enableHiveSupport()
            .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

    // Load data and infer schema, disregard toDF() name as it returns Dataset
    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
    implicitDS.printSchema();
    implicitDS.show();

    // createOrReplaceTempView
    implicitDS.createOrReplaceTempView("my_table");
    // mongo_spark.sql("DROP TABLE IF EXISTS my_table");
    // cannot save table this step
    // implicitDS.write().saveAsTable("my_table");
    // can query the temp view
    mongo_spark.sql("SELECT * FROM my_table limit 1").show();

    // More application logic would go here...
    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
    System.out.println(rdd.count());
    System.out.println(rdd.first().toJson());

    jsc.close();
}

Есть ли у кого-нибудь опыт выполнения этой искровой работы ETL в Java? Я очень ценю.

1 Ответ

0 голосов
/ 06 апреля 2020

Когда я работал над этим, я понял, что это широкий вопрос. Точный ответ на этот вопрос:

public static void main(final String[] args) throws InterruptedException {
    // spark session read mongodb
    SparkSession mongo_spark = SparkSession.builder()
            .master("local")
            .appName("MongoSparkConnectorIntro")
            .config("mongo_spark.master", "local")
            .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
            .config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
            .enableHiveSupport()
            .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

    // Load data and infer schema, disregard toDF() name as it returns Dataset
    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
    implicitDS.printSchema();
    implicitDS.show();

    // createOrReplaceTempView
    implicitDS.createOrReplaceTempView("my_table");
    mongo_spark.sql("DROP TABLE IF EXISTS my_table");
    implicitDS.write().saveAsTable("my_table");

    jsc.close();
}

Так что на самом деле код работает, но меня блокирует то, что что-то происходит в моих данных

  1. тип данных конфликта одиночное поле (com.mongodb.spark.exceptions.MongoTypeConversionException: Can't cast…) - это можно решить, увеличив размер выборки во время загрузки, проверьте java синтаксис Как настроить Java Размер образца Spark sparksession

  2. nulltype во вложенной структуре - этот я все еще ищу решение в Java

Как много исследований я получил scala примеров кода Я сделаю все возможное, чтобы записать то, что я нашел, и, надеюсь, это поможет вам сэкономить время

...