Я пытаюсь соединить Spark Streams с MongoDB в JAVA .Все, что мне нужно сделать, это написать поток, используя MongoSpark.
Но я получаю странную ошибку при
WriteConfig defaultWriteConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
Ошибка:
java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property
at com.mongodb.spark.config.MongoCompanionConfig.databaseName(MongoCompanionConfig.scala:261)
at com.mongodb.spark.config.MongoCompanionConfig.databaseName$(MongoCompanionConfig.scala:256)
at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:37)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:244)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:124)
at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:123)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:113)
at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:112)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig.apply(MongoCompanionConfig.scala:100)
at com.mongodb.spark.config.MongoCompanionConfig.apply$(MongoCompanionConfig.scala:100)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.WriteConfig$.create(WriteConfig.scala:440)
at com.mongodb.spark.config.WriteConfig.create(WriteConfig.scala)
at spark.SparkConsumer.writeStreamToMongo(SparkConsumer.java:198)
at spark.SparkConsumer.lambda$consumeStream$e3b46054$1(SparkConsumer.java:155)
at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$1$adapted(JavaDStreamLike.scala:272)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Пример кода :
private static void writeStreamToMongo(JavaRDD<Document> documentJavaRDD){
SparkSession spark = null;
try {
spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/")
.config("spark.mongodb.output.database", "stream")
.config("spark.mongodb.output.collection", "myCollection")
.getOrCreate();
}catch (Exception e){
e.printStackTrace();
}
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<String, String>();
//writeOverrides.put("uri", "myCollection");
writeOverrides.put("uri", "mongodb://localhost:27017/");
writeOverrides.put("database", "stream");
writeOverrides.put("collection", "myCollection");
writeOverrides.put("writeConcern.w", "majority");
writeOverrides.put("spark.mongodb.output.uri", "mongodb://localhost:27017/stream.myCollection");
try {
WriteConfig defaultWriteConfig = WriteConfig.create(jsc).withOptions(writeOverrides);;
MongoSpark.save(documentJavaRDD, defaultWriteConfig);
}catch (Exception e){
e.printStackTrace();
}
jsc.close();
}
Я не уверен, почему он выдает эту ошибку.Я что-то пропустил?Буду очень признателен, если кто-то сможет поделиться своими мыслями.
Версия MongoDB: 4.2 Версия Spark: 2.4.1 mongo-spark-connector_2.12: 2.4.1 spark-sql_2.12: 2.4.1