Некоторые задачи, которые вставляют данные в mongodb, занимают много времени в потоковой передаче искры. - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть приложение для потокового воспроизведения, которое извлекает записи журнала из Kafka и вставляет все записи журнала доступа в mongodb.Приложение запускается нормально в первые несколько пакетов, но после некоторых пакетов появляются некоторые задачи в задании, требующие довольно много времени для вставки данных в mongodb. Я думаю, что это должно быть проблемой с моей конфигурацией пула соединений mongodb, но я попыталсяменяется довольно много без продвижения по службе.

Вот результаты веб-интерфейса:

время, необходимое для каждой работы

время, необходимое для нештатных задач

Spark: версия-1.5.1 на пряже (ну, может быть, она действительно слишком старая.)

Mongodb: версия 3.4.4, работающая на четырех машинах с 12 осколками. На каждой машине 160G + и 40ЦП.Коды для пула соединений mongodb:

private MongoManager() {
        if (mongoClient == null) {
            MongoClientOptions.Builder build = new MongoClientOptions.Builder();
            build.connectionsPerHost(200);
            build.socketTimeout(1000);
            build.threadsAllowedToBlockForConnectionMultiplier(200);
            build.maxWaitTime(1000 * 60 * 2);
            build.connectTimeout(1000 * 60 * 1);
            build.writeConcern(WriteConcern.UNACKNOWLEDGED);
            MongoClientOptions myOptions = build.build();
            try {
                ServerAddress serverAddress1 = new ServerAddress(ip1, 20000);
                ServerAddress serverAddress2 = new ServerAddress(ip2, 20000);
                ServerAddress serverAddress3 = new ServerAddress(ip3, 20000);
                ServerAddress serverAddress4 = new ServerAddress(ip4, 20000);
                List<ServerAddress> lists = new ArrayList<>(8);
                lists.add(serverAddress1);
                lists.add(serverAddress2);
                lists.add(serverAddress3);
                lists.add(serverAddress4);
                mongoClient = new MongoClient(lists, myOptions);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void inSertBatch(String dbName, String collectionName, List<DBObject> jsons) {
            if (jsons == null || jsons.isEmpty()) {
                return;
            }
            DB db = mongoClient.getDB(dbName);
            DBCollection dbCollection = db.getCollection(collectionName);
            dbCollection.insert(jsons);
        }

А код потоковой передачи искры выглядит следующим образом:

referDstream.foreachRDD(rdd => {
  rdd.foreachPartition(partition => {
    val records = partition.map(x => {
      val data = x._1.split("_")
      val dbObject: DBObject = new BasicDBObject()
      dbObject.put("xxx","xxx")
      ...
      dbObject
    }).toList
    val mg: MongoManager = MongoManager.getInstance()
    mg.inSertBatch("dbname", "colname", records.asJava)
  })
})

Скрипт для подачи заявки:

nohup ${SPARK_HOME}/bin/spark-submit --name ${jobname} --driver-cores 2 --driver-memory 8g 
--num-executors 20 --executor-memory 16g --executor-cores 4
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" --conf "spark.shuffle.manager=hash" 
--conf "spark.shuffle.consolidateFiles=true" --driver-java-options "-XX:+UseConcMarkSweepGC" 
--master ${master} --class  ${APP_MAIN}  --jars ${jars_path:1}  ${APP_HOME}/${MAINJAR} ${sparkconf} &

Данные получены изОболочка монго:

$ mongostat -h xxx.xxx.xxx.xxx:20000
insert query update delete getmore command flushes mapped vsize  res faults qrw arw net_in net_out conn                time
    *0    *0     *0     *0       0    14|0       0     0B 1.17G 514M      0 0|0 0|0   985b   19.4k   58 Dec  7 03:10:52.949
  2999    *0     *0     *0       0     8|0       0     0B 1.17G 514M      0 0|0 0|0   517b   17.6k   58 Dec  7 03:10:53.950
 15000    *0     *0     *0       0    19|0       0     0B 1.17G 514M      0 0|0 0|0   402b   17.2k   58 Dec  7 03:10:54.950
 17799    *0     *0     *0       0    22|0       0     0B 1.17G 514M      0 0|0 0|0  30.5m   16.9k   58 Dec  7 03:10:55.950
 15996    *0     *0     *0       0    18|0       0     0B 1.17G 514M      0 0|0 0|0   343b   16.9k   58 Dec  7 03:10:56.950
 12003    *0     *0     *0       0    26|0       0     0B 1.17G 514M      0 0|0 0|0   982b   19.3k   58 Dec  7 03:10:57.949
    *0    *0     *0     *0       0     6|0       0     0B 1.17G 514M      0 0|0 0|0   518b   17.6k   58 Dec  7 03:10:58.949
  4704    *0     *0     *0       0     8|0       0     0B 1.17G 514M      0 0|0 0|0  10.2m   17.1k   58 Dec  7 03:10:59.950
 34600    *0     *0     *0       0    64|0       0     0B 1.17G 526M      0 0|0 0|0  26.9m   16.9k   58 Dec  7 03:11:00.951
 33129    *0     *0     *0       0    36|0       0     0B 1.17G 526M      0 0|0 0|0   344b   17.0k   58 Dec  7 03:11:01.949

mongos> db.serverStatus().connections
{ "current" : 57, "available" : 19943, "totalCreated" : 2707 }

Спасибо за любые предложения о том, как решить эту проблему.

...