Как построить переменную DataInput для передачи в функцию в Java? - PullRequest
0 голосов
/ 19 октября 2019

В настоящее время я решаю проблему с полосами подсчета биграмм, используя карту Hadoop Reduce. Мне дали класс HashMapStringIntWritable, который расширяет HashMap, для хранения «полос»

Мне необходимо вызвать функцию readFields (DataInput in), чтобы установить ключ и значение в HashMapStringIntWrite STRIPE. Однако, поскольку параметру нужен тип DataInput, я не могу просто напрямую сделать что-то вроде STRIPE.readFields («test», 1). Итак, как я могу инициализировать и отформатировать переменную DataInput, чтобы передать нужные данные в эту функцию?

Аналогично, мне также нужно извлечь данные в HashMapStringIntWrite STRIPE, вызвав write (DataOutput out), который использует out.writeInt (xxx) / out.writeUTF (xxx) для «возврата» ключей и значений. Но я не знаю, как записать эти данные в переменную типа int или string. Что-то вроде int key = write ()?

HashMapStringIntWritable class:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Writable;

public class HashMapStringIntWritable extends HashMap<String, Integer>
        implements Writable {
    /**
     * The serialization runtime associates with each serializable class a
     * version number, called a serialVersionUID, which is used during
     * deserialization to verify that the sender and receiver of a serialized
     * object have loaded classes for that object that are compatible with
     * respect to serialization.
     */
    private static final long serialVersionUID = -5222591318213533456L;

    /*
     * Creates StripeWritable
     */
    public HashMapStringIntWritable() {
        super();
    }

    /*
     * Deserializes HashMapStringIntWritable
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.clear();

        int numEntries = in.readInt();
        if (numEntries == 0) {
            return;
        }

        for (int i = 0; i < numEntries; i++) {
            String key = in.readUTF();
            int value = in.readInt();
            this.put(key, value);
        }
    }

    /*
     * Serializes HashMapStringIntWritable
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.size());
        if (this.size() == 0) {
            return;
        }

        for (Map.Entry<String, Integer> e : this.entrySet()) {
            out.writeUTF(e.getKey());
            out.writeInt(e.getValue());
        }
    }

    /**
     * Increments the value of the key by one
     * 
     * @param key
     */
    public void increment(String key) {
        if (this.containsKey(key)) {
            this.put(key, this.get(key) + 1);
        } else {
            this.put(key, 1);
        }
    }

    /**
     * Increments the value of the key by inc
     * 
     * @param key
     * @param inc
     *            is the increments by which the value should be incremented
     */
    public void increment(String key, int inc) {
        if (this.containsKey(key)) {
            this.put(key, this.get(key) + inc);
        } else {
            this.put(key, inc);
        }
    }

    /**
     * Adds up two HashMapStringIntWritable
     * 
     * @param that
     */
    public void plus(HashMapStringIntWritable that) {
        for (Map.Entry<String, Integer> e : that.entrySet()) {
            String key = e.getKey();
            this.increment(key, e.getValue());
        }
    }

}

И мне нужно было бы вызвать функции readFields и прочитать в следующем коде:

/*
     * Mapper: emits <word, stripe> where stripe is a hash map
     */
    private static class MyMapper extends
            Mapper<LongWritable, Text, Text, HashMapStringIntWritable> {

        // Reuse objects to save overhead of object creation.
        private static final Text KEY = new Text();
        private static final HashMapStringIntWritable STRIPE = new HashMapStringIntWritable();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = ((Text) value).toString();
            String[] words = line.trim().split("\\s+");

            /*
             * TODO: Your implementation goes here
             */

                        String prev = null;
                        for (String w: words) {
                // Skip empty words
                if (w.length() == 0 && prev == null) {
                    continue;
                }
                KEY.set(prev);
                                STRIPE.readFields(w);  // I'm not sure how to pass parameter to STRIPE, this line fails
                context.write(KEY, STRIPE);
            }
        }
    }


/*
     * Reducer: aggregate all stripes associated with each key
     */
    private static class MyReducer extends
            Reducer<Text, HashMapStringIntWritable, PairOfStrings, IntWritable> {

        // Reuse objects.
        private final static HashMapStringIntWritable SUM_STRIPES = new HashMapStringIntWritable();
        private final static PairOfStrings BIGRAM = new PairOfStrings();
        private final static IntWritable COUNT = new IntWritable();

        @Override
        public void reduce(Text key,
                Iterable<HashMapStringIntWritable> stripes, Context context)
                throws IOException, InterruptedException {
            /*
             * TODO: Your implementation goes here. Hint: You can add up two
             * stripes using the plus() method. Please refer to the
             * implementation of HashMapStringIntWritable for details.
             */
                        // Sum up values.
                        // Not finished since I am stuck in the DataInput DataOutput thing
            Iterator<HashMapStringIntWritable> iter = stripes.iterator();

            while (iter.hasNext()) {
                SUM_STRIPES = SUM_STRIPES.plus(iter.next().get());
            } 
            //context.write(key, SUM_STRIPES);

            /*
             * The output must be a sequence of key-value pairs of <bigram,
             * count>, the same as that of the "pairs" approach
             */

                        // I need to retrieve the data in SUM_STRIPE using read(DataInput in) here, but don't now how to save it / retrieve it to some variable

        }
    }



...