Не удается загрузить из нескольких баз данных с помощью драйвера Spark-Mongo - PullRequest
1 голос
/ 16 апреля 2019

Я работаю над модулем spark, где мне нужно загрузить коллекции из нескольких источников (баз данных), но я не могу получить коллекцию из второй базы данных.

Базы данных

 DB1
  L_coll1
 DB2
  L_coll2

Логический код

String mst ="local[*]";
        String host= "localhost";
        String port = "27017";
        String DB1 = "DB1";
         String DB2 = "DB2";

         SparkConf conf = new SparkConf().setAppName("cust data").setMaster(mst);
            SparkSession spark = SparkSession
                    .builder() 
                    .config(conf)
                     .config("spark.mongodb.input.uri", "mongodb://"+host+":"+port+"/")
                     .config("spark.mongodb.input.database",DB1)
                     .config("spark.mongodb.input.collection","coll1")
                    .getOrCreate();

            SparkSession spark1 = SparkSession
                    .builder() 
                    .config(conf)
                     .config("spark.mongodb.input.uri", "mongodb://"+host+":"+port+"/")
                     .config("spark.mongodb.input.database",DB2)
                     .config("spark.mongodb.input.collection","coll2")

                    .getOrCreate();

            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
            JavaSparkContext jsc1 = new JavaSparkContext(spark1.sparkContext());

Чтение конфигурации

ReadConfig readConfig = ReadConfig.create(spark);
            Dataset<Row> MongoDatset =  MongoSpark.load(jsc,readConfig).toDF();

            MongoDatset.show();


            ReadConfig readConfig1 = ReadConfig.create(spark1);
            Dataset<Row> MongoDatset1 =  MongoSpark.load(jsc1,readConfig1).toDF();

            MongoDatset1.show();

После запуска кода about,Я получаю первый набор данных несколько раз.Если я прокомментирую первый экземпляр SparkSession spark, то получу коллекцию только из второго db DB2.

1 Ответ

1 голос
/ 17 апреля 2019

Вместо использования нескольких сеансов зажигания вы можете использовать опцию переопределения ReadConfig для получения нескольких баз данных и коллекций.

Создание сеанса зажигания


 String DB = "DB1";
 String DB1 = "DB2";
 String Coll1 ="Coll1";
 String Coll2 ="Coll2";

SparkSession spark = SparkSession.builder()
                  .master("local")
                  .appName("MongoSparkConnectorIntro")
                  .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
                  .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
                  .getOrCreate();

                // Create a JavaSparkContext using the SparkSession's SparkContext object
                JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

Получить функцию базы данных

private static Dataset<Row> getDB(JavaSparkContext jsc_, String DB, String Coll1) {
        // Create a custom ReadConfig
        Map<String, String> readOverrides = new HashMap<String, String>();
        readOverrides.put("database",DB );
        readOverrides.put("collection", Coll1);
        readOverrides.put("readPreference.name", "secondaryPreferred");

        System.out.println(readOverrides);
        ReadConfig readConfig = ReadConfig.create(jsc_).withOptions(readOverrides);

        return   MongoSpark.load(jsc_,readConfig).toDF();

    }


Использование getDB для создания нескольких баз данных

Dataset<Row> MongoDatset1 = getDB(jsc, DB, Coll1);
Dataset<Row> MongoDatset2 = getDB(jsc, DB1, Coll2);


MongoDatset1.show(1);
MongoDatset2.show(1); 

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...