У меня проблемы с производительностью заданий Spark, большая часть данных выполняется только одним исполнителем в кластере, хотя доступно много узлов.
Чтобы разработать логику задания, в основном у нас есть два события и элементы таблицы hbase, задание spark подсчитывает количество событий за определенную продолжительность. Сначала я считываю события из таблицы событий и передаю результаты процессу, где он получить предметы для особых событий, а затем применить некоторую бизнес-логику, а затем подсчитать.
задание занимает 8-10 минут для обработки только 2000 строк, тогда как то же самое задание в mapreduce обрабатывает миллион строк за одно и то же время.
Поскольку я очень новичок в Spark, я не знаю, что не так в коде.
Ниже приведен код для справки
public class ABC{
public static void main(String[] args) throws Exception {
if (args == null || args.length < 3) {
System.out.println("Specify input parameters: <itemshdfspath>, <datefrom> <dateto>");
return;
}
String hbTable = "mevts";
String dateFrom = args[1];
String dateTo = args[2];
String itemsHdfsPath = args[0]; //hdfs path
//Create a SparkContext to initialize
SparkConf sconf = new SparkConf().setAppName(" events ");
// Create a Java version of the Spark Context
JavaSparkContext sc = new JavaSparkContext(sconf);
Processor.jsc = sc;
Processor.hdfsPath = itemsHdfsPath;
Processor.dateFrom=dateFrom;
Processor.dateTo=dateTo;
Configuration conf =HbaseConfiguration.get();
conf.set(TableInputFormat.INPUT_TABLE, hbTable);
String hdfsTarget = itemsHdfsPath ;
Path hdfsItemsOutputn = new Path(hdfsTarget);
hdfsItemsOutputn.getFileSystem(conf).delete(hdfsItemsOutputn,true);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
List<RowRange> ranges = new ArrayList<RowRange>();
String[] prefixes = new String[] { "C", "D"};
for (String prefix : prefixes)
ranges.add(new RowRange(prefix + dateFrom, true, prefix + dateTo, false));
Filter eventsFilter = new MultiRowRangeFilter(ranges);
scan.setFilter(eventsFilter);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
conf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
JavaPairRDD<ImmutableBytesWritable, Result> source = sc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Processor.process(source);
}
}
public class Processor {
public static JavaSparkContext jsc;
public static String hdfsPath;
public static String kbsKeyTabFilePath;
public static String kbsUser;
public static String dateFrom;
public static String dateTo;
public static void process(JavaPairRDD<ImmutableBytesWritable, Result> source) {
Broadcast<String[]> bcParams = jsc.broadcast(new String[] { hdfsPath, kbsKeyTabFilePath, kbsUser, dateFrom,
dateTo });
List<Tuple2<String, Long>> res = source
.map(f -> Bytes.toString(f._2.getRow()))
.mapPartitions(p -> {
String[] params = bcParams.value();
Configuration conf = params[1] == null ? HbaseConfiguration.get()
:HbaseConfiguration.get(params[1], params[2]);
HTable mailItemsTable = new HTable(conf, "");
// RefDataCache refDataCache=RefDataCache.fromJson(params[7]);
Iterable<String> iterable = () -> p;
return StreamSupport.stream(iterable.spliterator(), false).map(s -> {
try {
String key = s.toString();
long fromDatel = Long.parseLong(params[3]);
long toDatel = Long.parseLong(params[4]);
String EventName = key.substring(0, 2);
String itemID = key.toString().substring(14);
Get g = new Get(Bytes.toBytes(itemID));
g.setMaxVersions(1);
Result item = mailItemsTable.get(g);
String itemId = Bytes.toString(item.getRow());
List<Cell> getCOrDCells = item.listCells().stream()
.filter(c -> (Bytes.toString(CellUtil.cloneQualifier(c)).substring(0, 2).equalsIgnoreCase(EventName))
&& Long.parseLong(Bytes.toString(CellUtil.cloneQualifier(c)).substring(2))>=fromDatel
&& Long.parseLong(Bytes.toString(CellUtil.cloneQualifier(c)).substring(2))<toDatel)
.collect(Collectors.toList());
mailItemsTable.close();//
if (getCOrDCells == null || getCOrDCells.size() < 1)
return null;
String eventName = null,eventcode = null;
Cell getEarliestSendingEvent = getCOrDCells.get(0);
eventName = Bytes.toString(CellUtil.cloneQualifier(getEarliestSendingEvent));
String evtDT = eventName.substring(2);
JSONObject evtObj = new JSONObject(Bytes.toString(CellUtil.cloneValue(getEarliestSendingEvent)));
eventcode = eventName.substring(0, 2);
String recipientId = evtObj.getString("r");
String senderId = evtObj.getString("s");
String senderOpeCode = senderId.substring(0, 2) + "A";
String recipientOpeCode = recipientId.substring(0, 2) + "A";
Item Item = new Item();
Item.ItemID = itemId;
Item.OrigOp = senderOpeCode;
Item.DestOp = recipientOpeCode;
if (eventcode.equalsIgnoreCase("C"))
Item.CDT = evtDT;
else
Item.DDT = evtDT;
if (Item == null)
return null;
List<String> colNames = new ArrayList<String>();
List<String> colValues = new ArrayList<String>();
Item.storeToHBase(colNames, colValues);
StringBuilder valBuilders = new StringBuilder();
for (int c = 0; c < colValues.size(); c++) {
if (c != 0)
valBuilders.append("\t");
valBuilders.append(colValues.get(c));
}
return valBuilders.toString();
} catch (Exception ex) {
System.err.println(ex.getMessage());
return (String) null; // this will throw exception further
// in spark
}
}).iterator();
})
.flatMapToPair(f -> {
String itemDetails = f.toString();
Item Item = new Item(itemDetails);
List<Tuple2<String, Long>> listMap = new ArrayList<Tuple2<String, Long>>();
if (isValid(Item.CDT))
listMap.add(new Tuple2<>("CCount", 1L));
else
listMap.add(new Tuple2<>("DCount", 1L));
return listMap.iterator();
})
.reduceByKey((x, y) -> x + y).collect();
for (Tuple2<String, Long> tpl : res) {
System.out.println(tpl._1 + ": " + String.valueOf(tpl._2));
}
}
public static boolean isValid(String val) {
return (val != null && !val.isEmpty());
}
}