Отсутствует имя базы данных.Устанавливается через свойство «spark.mongodb.output.uri» или «spark.mongodb.output.database» - PullRequest
0 голосов
/ 29 сентября 2019

Я пытаюсь соединить Spark Streams с MongoDB в JAVA .Все, что мне нужно сделать, это написать поток, используя MongoSpark.

Но я получаю странную ошибку при

 WriteConfig defaultWriteConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

Ошибка:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property
    at com.mongodb.spark.config.MongoCompanionConfig.databaseName(MongoCompanionConfig.scala:261)
    at com.mongodb.spark.config.MongoCompanionConfig.databaseName$(MongoCompanionConfig.scala:256)
    at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:37)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:244)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:124)
    at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:123)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:113)
    at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:112)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:100)
    at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:100)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.WriteConfig$.create(WriteConfig.scala:440)
    at com.mongodb.spark.config.WriteConfig.create(WriteConfig.scala)
    at spark.SparkConsumer.writeStreamToMongo(SparkConsumer.java:198)
    at spark.SparkConsumer.lambda$consumeStream$e3b46054$1(SparkConsumer.java:155)
    at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1$adapted(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Пример кода :

private static void writeStreamToMongo(JavaRDD<Document> documentJavaRDD){
        SparkSession spark = null;
        try {
            spark = SparkSession.builder()
                    .master("local")
                    .appName("MongoSparkConnectorIntro")
                    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/")
                    .config("spark.mongodb.output.database", "stream")
                    .config("spark.mongodb.output.collection", "myCollection")
                    .getOrCreate();
        }catch (Exception e){
            e.printStackTrace();
        }

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        // Create a custom WriteConfig
        Map<String, String> writeOverrides = new HashMap<String, String>();
        //writeOverrides.put("uri", "myCollection");
        writeOverrides.put("uri", "mongodb://localhost:27017/");
        writeOverrides.put("database", "stream");
        writeOverrides.put("collection", "myCollection");
        writeOverrides.put("writeConcern.w", "majority");
        writeOverrides.put("spark.mongodb.output.uri", "mongodb://localhost:27017/stream.myCollection");

        try {
        WriteConfig defaultWriteConfig = WriteConfig.create(jsc).withOptions(writeOverrides);;
        MongoSpark.save(documentJavaRDD, defaultWriteConfig);
        }catch (Exception e){
            e.printStackTrace();
        }

        jsc.close();


   }

Я не уверен, почему он выдает эту ошибку.Я что-то пропустил?Буду очень признателен, если кто-то сможет поделиться своими мыслями.

Версия MongoDB: 4.2 Версия Spark: 2.4.1 mongo-spark-connector_2.12: 2.4.1 spark-sql_2.12: 2.4.1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...