Этот код агрегирует между новым CSV-файлом и существующими данными MongoDB.Как существующие, так и новые записи будут сохранены в MongoDB.Этот процесс занимает около 7-8 часов, чтобы завершиться.Я хочу увеличить скорость обработки этого кода.
if (slaveSchemaTable.limit(1).count() != 0) {
//Write new CSV files into MongoDB if the collection is empty. No need checking and validating
MongoSpark.save(csvSchemaTable.write().mode("overwrite"), writeConfig);
} else {
// Aggregating between the new CSV file and the existing MongoDB data. Both existing and new will be saved into MongoDB
slaveSchemaTable.createOrReplaceTempView("SlaveSchemaTable");
// slaveSchemaTable.show();
StringBuilder joinBuilder = new StringBuilder("SELECT b._id as _id,");
joinBuilder.append("a.msisdn as msisdn,");
joinBuilder.append("a.classification as classification,");
joinBuilder.append("a.event_date_actual as event_date_actual,");
joinBuilder.append("(IFNULL(a.up_vol_mb,0), IFNULL(b.up_vol_mb,0)) as up_vol_mb,");
joinBuilder.append("(IFNULL(a.down_vol_mb,0) + IFNULL(b.down_vol_mb,0)) as down_vol_mb,");
joinBuilder.append("(IFNULL(a.total_vol_mb,0) + IFNULL(b.total_vol_mb,0)) as total_vol_mb ");
joinBuilder.append("FROM FinalCsvSchemaTable a LEFT JOIN SlaveSchemaTable b ");
joinBuilder.append("ON (a.msisdn = b.msisdn) AND ");
joinBuilder.append("(a.classification = b.classification) AND ");
joinBuilder.append("(a.event_date_actual = b.event_date_actual) ");
Dataset<Row> slaveTable = sparkSession.sql(joinBuilder.toString());
MongoSpark.save(slaveTable.write().mode("append"), writeConfig);
}
MongoSpark.save(csvSchemaTable.write().mode("overwrite"), writeConfig);
log.debug("End Slave Aggregation Batch");
return true;
Я получаю медленную обработку в части else
кода выше.Я ожидаю увеличения скорости обработки, поэтому для выполнения большого количества файлов требуется максимум 20-30 минут.