В настоящее время я решаю проблему с полосами подсчета биграмм, используя карту Hadoop Reduce. Мне дали класс HashMapStringIntWritable, который расширяет HashMap, для хранения «полос»
Мне необходимо вызвать функцию readFields (DataInput in), чтобы установить ключ и значение в HashMapStringIntWrite STRIPE. Однако, поскольку параметру нужен тип DataInput, я не могу просто напрямую сделать что-то вроде STRIPE.readFields («test», 1). Итак, как я могу инициализировать и отформатировать переменную DataInput, чтобы передать нужные данные в эту функцию?
Аналогично, мне также нужно извлечь данные в HashMapStringIntWrite STRIPE, вызвав write (DataOutput out), который использует out.writeInt (xxx) / out.writeUTF (xxx) для «возврата» ключей и значений. Но я не знаю, как записать эти данные в переменную типа int или string. Что-то вроде int key = write ()?
HashMapStringIntWritable class:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Writable;
public class HashMapStringIntWritable extends HashMap<String, Integer>
implements Writable {
/**
* The serialization runtime associates with each serializable class a
* version number, called a serialVersionUID, which is used during
* deserialization to verify that the sender and receiver of a serialized
* object have loaded classes for that object that are compatible with
* respect to serialization.
*/
private static final long serialVersionUID = -5222591318213533456L;
/*
* Creates StripeWritable
*/
public HashMapStringIntWritable() {
super();
}
/*
* Deserializes HashMapStringIntWritable
*/
@Override
public void readFields(DataInput in) throws IOException {
this.clear();
int numEntries = in.readInt();
if (numEntries == 0) {
return;
}
for (int i = 0; i < numEntries; i++) {
String key = in.readUTF();
int value = in.readInt();
this.put(key, value);
}
}
/*
* Serializes HashMapStringIntWritable
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.size());
if (this.size() == 0) {
return;
}
for (Map.Entry<String, Integer> e : this.entrySet()) {
out.writeUTF(e.getKey());
out.writeInt(e.getValue());
}
}
/**
* Increments the value of the key by one
*
* @param key
*/
public void increment(String key) {
if (this.containsKey(key)) {
this.put(key, this.get(key) + 1);
} else {
this.put(key, 1);
}
}
/**
* Increments the value of the key by inc
*
* @param key
* @param inc
* is the increments by which the value should be incremented
*/
public void increment(String key, int inc) {
if (this.containsKey(key)) {
this.put(key, this.get(key) + inc);
} else {
this.put(key, inc);
}
}
/**
* Adds up two HashMapStringIntWritable
*
* @param that
*/
public void plus(HashMapStringIntWritable that) {
for (Map.Entry<String, Integer> e : that.entrySet()) {
String key = e.getKey();
this.increment(key, e.getValue());
}
}
}
И мне нужно было бы вызвать функции readFields и прочитать в следующем коде:
/*
* Mapper: emits <word, stripe> where stripe is a hash map
*/
private static class MyMapper extends
Mapper<LongWritable, Text, Text, HashMapStringIntWritable> {
// Reuse objects to save overhead of object creation.
private static final Text KEY = new Text();
private static final HashMapStringIntWritable STRIPE = new HashMapStringIntWritable();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = ((Text) value).toString();
String[] words = line.trim().split("\\s+");
/*
* TODO: Your implementation goes here
*/
String prev = null;
for (String w: words) {
// Skip empty words
if (w.length() == 0 && prev == null) {
continue;
}
KEY.set(prev);
STRIPE.readFields(w); // I'm not sure how to pass parameter to STRIPE, this line fails
context.write(KEY, STRIPE);
}
}
}
/*
* Reducer: aggregate all stripes associated with each key
*/
private static class MyReducer extends
Reducer<Text, HashMapStringIntWritable, PairOfStrings, IntWritable> {
// Reuse objects.
private final static HashMapStringIntWritable SUM_STRIPES = new HashMapStringIntWritable();
private final static PairOfStrings BIGRAM = new PairOfStrings();
private final static IntWritable COUNT = new IntWritable();
@Override
public void reduce(Text key,
Iterable<HashMapStringIntWritable> stripes, Context context)
throws IOException, InterruptedException {
/*
* TODO: Your implementation goes here. Hint: You can add up two
* stripes using the plus() method. Please refer to the
* implementation of HashMapStringIntWritable for details.
*/
// Sum up values.
// Not finished since I am stuck in the DataInput DataOutput thing
Iterator<HashMapStringIntWritable> iter = stripes.iterator();
while (iter.hasNext()) {
SUM_STRIPES = SUM_STRIPES.plus(iter.next().get());
}
//context.write(key, SUM_STRIPES);
/*
* The output must be a sequence of key-value pairs of <bigram,
* count>, the same as that of the "pairs" approach
*/
// I need to retrieve the data in SUM_STRIPE using read(DataInput in) here, but don't now how to save it / retrieve it to some variable
}
}