У меня есть работа только с Mapper PrepareData, которая необходима для преобразования текстовых данных в SequencialFile с VLongWritable как ключ и DoubleArrayWritable как значение .
Когда я запускаю его поверх 455000x90 (~ 384 Мб) данных со строками, например:
13,124,123.12,12.12, ... 1,12
23.12,1.5,12.6, ... 6.123
...
в локальном режиме, который в среднем занимает:
- 51 секунда на Athlon 64 X2 Dual Core 5600+, 2,79Ггц;
- 54 секунды на процессоре Athlon 64 3700+, 1Ггц;
=> 52-53 секунды в среднем.
но когда я запускаю его в реальном кластере на этих 2 машинах (Athlon 64 X2 Dual Core 5600+, 3700+), в лучшем случае это занимает 81 секунду.
Задание выполнено с 4 картографами (размер блока ~ 96 МБ) и 2 редукторами.
Кластер работает от Hadoop 0.21.0 , настроен для повторного использования jvm.
Mapper
public class PrepareDataMapper
extends Mapper<LongWritable, Text, VLongWritable, DoubleArrayWritable> {
private int size;
// hint
private DoubleWritable[] doubleArray;
private DoubleArrayWritable mapperOutArray = new DoubleArrayWritable();
private VLongWritable mapOutKey = new VLongWritable();
@Override
protected void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
size = conf.getInt("dataDimSize", 0);
doubleArray = new DoubleWritable[size];
for (int i = 0; i < size; i++) {
doubleArray[i] = new DoubleWritable();
}
}
@Override
public void map(
LongWritable key,
Text row,
Context context) throws IOException, InterruptedException {
String[] fields = row.toString().split(",");
for (int i = 0; i < size; i++) {
doubleArray[i].set(Double.valueOf(fields[i]));
}
mapperOutArray.set(doubleArray);
mapOutKey.set(key.get());
context.write(mapOutKey, mapperOutArray);
}
}
DoubleArrayWritable
public class DoubleArrayWritable extends ArrayWritable {
public DoubleArrayWritable() {
super(DoubleWritable.class);
}
public DoubleArrayWritable(DoubleWritable[] values) {
super(DoubleWritable.class, values);
}
public void set(DoubleWritable[] values) {
super.set(values);
}
public DoubleWritable get(int idx) {
return (DoubleWritable) get()[idx];
}
public double[] getVector(int from, int to) {
int sz = to - from + 1;
double[] vector = new double[sz];
for (int i = from; i <= to; i++) {
vector[i-from] = get(i).get();
}
return vector;
}
}