Как эффективно интегрировать Kafka-Spark-MongoDb - PullRequest
0 голосов
/ 01 февраля 2019

Я пишу преобразование Spark 2.4 для бенчмаркинга Spark, который получит потоки JSON из раздела Kafka и должен будет выгружать его в MongoDB.Я могу сделать это с помощью Java MongoClient, но данные могут быть огромными, например, 1 миллион записей, поступающих через несколько потоков от Kafka.Spark обрабатывает его очень быстро, но запись на монго идет очень медленно.

            SparkConf sparkConf = new SparkConf().setMaster("local[*]").
            setAppName("JavaDirectKafkaStreaming"); 

    sparkConf.set("spark.streaming.backpressure.enabled","true");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("bootstrap.servers", "loacalhost:9092");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "2");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("poc-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,
            LocationStrategies.PreferConsistent(),
            org.apache.spark.streaming.kafka010.ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

    @SuppressWarnings("serial")
    JavaPairDStream<String, String> jPairDStream = stream
            .mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {

        jPairRDD.foreach(rdd -> {
            System.out.println("value=" + rdd._2());
            if (rdd._2() != null) {
                System.out.println("inserting=" + rdd._2());

                Document doc = Document.parse(rdd._2());
                // List<Document> list = new ArrayList<>();
                // list.add(doc);
                db.getCollection("collection").insertOne(doc);
                System.out.println("Inserted Data Done");
            }

            else {
                System.out.println("Got no data in this window");
            }

        });
    });
    streamingContext.start();
    streamingContext.awaitTermination();

Где

             MongoClient mongo = new MongoClient("localhost", 27017);
             MongoDatabase db = mongo.getDatabase("mongodb");

Я рассчитываю ускорить операцию монго, как добиться многопоточности для записи монго?(Должен ли я использовать MongoClientOptions для minconnection для каждого хоста?)

Также правильный подход - использовать MongoDriver или он должен быть подключен через соединитель MonogSpark или API-интерфейсом spark writestream ().Если да, то как записать каждый rdd как отдельную запись в mongo, какой-нибудь пример на Java?

1 Ответ

0 голосов
/ 08 февраля 2019

Я не знаю об «эффективно», потому что здесь много факторов.

Например, разделы Kafka и полные исполнители Spark - это всего лишь два значения, которые необходимо настроить для согласования с пропускной способностью.

Я вижу, что вы используете ForEachWriter, что является хорошим способом сделать это, но, возможно, не лучшим, учитывая, что вы постоянно звоните insertOne, по сравнению с использованием Spark Structed Streaming для началас помощью чтения из Kafka, манипулирования вашими данными в объект Struct, затем с использованием SparkSQL Mongo Connector для прямого дампа в коллекции Mongo (который, как я предполагаю, использует транзакции Mongo и вставляет несколько записей одновременно)


Также стоит упомянуть, что Landoop предлагает MongoDB Kafka Connect Sink , для которого требуется один файл конфигурации и не нужно писать код Spark.

...