У меня есть приложение для потокового воспроизведения, которое извлекает записи журнала из Kafka и вставляет все записи журнала доступа в mongodb.Приложение запускается нормально в первые несколько пакетов, но после некоторых пакетов появляются некоторые задачи в задании, требующие довольно много времени для вставки данных в mongodb. Я думаю, что это должно быть проблемой с моей конфигурацией пула соединений mongodb, но я попыталсяменяется довольно много без продвижения по службе.
Вот результаты веб-интерфейса:
время, необходимое для каждой работы
время, необходимое для нештатных задач
Spark: версия-1.5.1 на пряже (ну, может быть, она действительно слишком старая.)
Mongodb: версия 3.4.4, работающая на четырех машинах с 12 осколками. На каждой машине 160G + и 40ЦП.Коды для пула соединений mongodb:
private MongoManager() {
if (mongoClient == null) {
MongoClientOptions.Builder build = new MongoClientOptions.Builder();
build.connectionsPerHost(200);
build.socketTimeout(1000);
build.threadsAllowedToBlockForConnectionMultiplier(200);
build.maxWaitTime(1000 * 60 * 2);
build.connectTimeout(1000 * 60 * 1);
build.writeConcern(WriteConcern.UNACKNOWLEDGED);
MongoClientOptions myOptions = build.build();
try {
ServerAddress serverAddress1 = new ServerAddress(ip1, 20000);
ServerAddress serverAddress2 = new ServerAddress(ip2, 20000);
ServerAddress serverAddress3 = new ServerAddress(ip3, 20000);
ServerAddress serverAddress4 = new ServerAddress(ip4, 20000);
List<ServerAddress> lists = new ArrayList<>(8);
lists.add(serverAddress1);
lists.add(serverAddress2);
lists.add(serverAddress3);
lists.add(serverAddress4);
mongoClient = new MongoClient(lists, myOptions);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void inSertBatch(String dbName, String collectionName, List<DBObject> jsons) {
if (jsons == null || jsons.isEmpty()) {
return;
}
DB db = mongoClient.getDB(dbName);
DBCollection dbCollection = db.getCollection(collectionName);
dbCollection.insert(jsons);
}
А код потоковой передачи искры выглядит следующим образом:
referDstream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val records = partition.map(x => {
val data = x._1.split("_")
val dbObject: DBObject = new BasicDBObject()
dbObject.put("xxx","xxx")
...
dbObject
}).toList
val mg: MongoManager = MongoManager.getInstance()
mg.inSertBatch("dbname", "colname", records.asJava)
})
})
Скрипт для подачи заявки:
nohup ${SPARK_HOME}/bin/spark-submit --name ${jobname} --driver-cores 2 --driver-memory 8g
--num-executors 20 --executor-memory 16g --executor-cores 4
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" --conf "spark.shuffle.manager=hash"
--conf "spark.shuffle.consolidateFiles=true" --driver-java-options "-XX:+UseConcMarkSweepGC"
--master ${master} --class ${APP_MAIN} --jars ${jars_path:1} ${APP_HOME}/${MAINJAR} ${sparkconf} &
Данные получены изОболочка монго:
$ mongostat -h xxx.xxx.xxx.xxx:20000
insert query update delete getmore command flushes mapped vsize res faults qrw arw net_in net_out conn time
*0 *0 *0 *0 0 14|0 0 0B 1.17G 514M 0 0|0 0|0 985b 19.4k 58 Dec 7 03:10:52.949
2999 *0 *0 *0 0 8|0 0 0B 1.17G 514M 0 0|0 0|0 517b 17.6k 58 Dec 7 03:10:53.950
15000 *0 *0 *0 0 19|0 0 0B 1.17G 514M 0 0|0 0|0 402b 17.2k 58 Dec 7 03:10:54.950
17799 *0 *0 *0 0 22|0 0 0B 1.17G 514M 0 0|0 0|0 30.5m 16.9k 58 Dec 7 03:10:55.950
15996 *0 *0 *0 0 18|0 0 0B 1.17G 514M 0 0|0 0|0 343b 16.9k 58 Dec 7 03:10:56.950
12003 *0 *0 *0 0 26|0 0 0B 1.17G 514M 0 0|0 0|0 982b 19.3k 58 Dec 7 03:10:57.949
*0 *0 *0 *0 0 6|0 0 0B 1.17G 514M 0 0|0 0|0 518b 17.6k 58 Dec 7 03:10:58.949
4704 *0 *0 *0 0 8|0 0 0B 1.17G 514M 0 0|0 0|0 10.2m 17.1k 58 Dec 7 03:10:59.950
34600 *0 *0 *0 0 64|0 0 0B 1.17G 526M 0 0|0 0|0 26.9m 16.9k 58 Dec 7 03:11:00.951
33129 *0 *0 *0 0 36|0 0 0B 1.17G 526M 0 0|0 0|0 344b 17.0k 58 Dec 7 03:11:01.949
mongos> db.serverStatus().connections
{ "current" : 57, "available" : 19943, "totalCreated" : 2707 }
Спасибо за любые предложения о том, как решить эту проблему.