Я пишу преобразование Spark 2.4 для бенчмаркинга Spark, который получит потоки JSON из раздела Kafka и должен будет выгружать его в MongoDB.Я могу сделать это с помощью Java MongoClient, но данные могут быть огромными, например, 1 миллион записей, поступающих через несколько потоков от Kafka.Spark обрабатывает его очень быстро, но запись на монго идет очень медленно.
SparkConf sparkConf = new SparkConf().setMaster("local[*]").
setAppName("JavaDirectKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled","true");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "loacalhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "2");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("poc-topic");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
org.apache.spark.streaming.kafka010.ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
@SuppressWarnings("serial")
JavaPairDStream<String, String> jPairDStream = stream
.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
System.out.println("value=" + rdd._2());
if (rdd._2() != null) {
System.out.println("inserting=" + rdd._2());
Document doc = Document.parse(rdd._2());
// List<Document> list = new ArrayList<>();
// list.add(doc);
db.getCollection("collection").insertOne(doc);
System.out.println("Inserted Data Done");
}
else {
System.out.println("Got no data in this window");
}
});
});
streamingContext.start();
streamingContext.awaitTermination();
Где
MongoClient mongo = new MongoClient("localhost", 27017);
MongoDatabase db = mongo.getDatabase("mongodb");
Я рассчитываю ускорить операцию монго, как добиться многопоточности для записи монго?(Должен ли я использовать MongoClientOptions для minconnection для каждого хоста?)
Также правильный подход - использовать MongoDriver или он должен быть подключен через соединитель MonogSpark или API-интерфейсом spark writestream ().Если да, то как записать каждый rdd как отдельную запись в mongo, какой-нибудь пример на Java?