Я пытаюсь сделать mapReduce в Java, используя MongoSpark и rdd (JavaMongoRdd).Поэтому в настоящее время я могу получить свой монго-документ в моем Rdd, но я не знаю, как действовать после этого.На самом деле в моем документе есть поле, которое является датой, и я хочу использовать год в этой дате, чтобы сделать свой mapReduce, но я не нахожу ничего, как это сделать.Итак, я здесь, чтобы спросить вас, есть ли у вас какая-то документация, учебное пособие или даже пример того, как действовать.
Здесь код, я пытаюсь создать пару RDD с документом Монго и годом, чтобы подсчитатьномер документа за каждый год, но я не знаю, как мне поступить:
public String count() {
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
logger.info("test 1 :" + rdd.count());
logger.info("test 2 :" + rdd.first().toJson());
/*JavaMongoRDD<Document> newRdd = rdd.withPipeline(
Collections.singletonList(
Document.parse("{ $match: { _id : { $gt : ObjectId(\"5c9e180cdba48525f0df30b9\") } } }")
)
);*/
//logger.info("test 2.5 :" +newRdd.first());
JavaPairRDD<String, Document> pairRdd = rdd
.mapToPair((document) -> new Tuple2(document.getString("date").split(".")[1], document));
logger.info("test 3 :" + pairRdd.first());
//logger.info("test 2 :" + rdd.first().toJson());
//ar
//logger.info("test spark");
return "test";
}
Мои документы MongoDb выглядят так:
"_id" : ObjectId("5c9e180ddba48525f0df30cb"),
"title" : "Redevance: une perte de compétitivité pour l’hydraulique suisse",
"description" : [
"Le Parlement a bouclé, durant cette session de printemps, la révision de la loi sur les forces hydrauliques. La solution adoptée aboutit au statu quo sur le plan de la redevance hydraulique. Le taux maximal de cette taxe reste ainsi fixé à 110 francs par kilowatt théorique, jusqu'à fin 2024. Les..."
],
"date" : "dimanche, 24. mars 2019"