Вычислительный набор пересечений и набор разностей записей двух файлов с помощью hadoop - PullRequest
5 голосов
/ 24 июня 2011

Извините за перекрестную публикацию этого сообщения в списке рассылки пользователей hadoop и здесь, но это становится для меня неотложным вопросом.

Моя проблема заключается в следующем: У меня есть два входных файла, и я хочу определить

  • а) Количество строк, которые встречаются только в файле 1
  • б) Количество строк, которые встречаются только в файле 2
  • c) Количество линий, общих для обеих (например, в отношении равенства строк)

Пример:

File 1:
a
b
c

File 2:
a
d

Желаемый вывод для каждого случая:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

В основном мой подход заключается в следующем: Я написал свой собственный LineRecordReader, чтобы маппер получал пару, состоящую из строки (текста) и байта, указывающего исходный файл (0 или 1). Картограф снова возвращает пару, так что на самом деле он ничего не делает. Тем не менее, побочным эффектом является то, что комбинатор получает

Map<Line, Iterable<SourceId>>

(где SourceId равен 0 или 1).

Теперь для каждой строки я могу получить набор источников, в которых она появляется. Поэтому я мог бы написать сумматор, который подсчитывает для каждого случая (a, b, c) количество строк (Листинг 1)

Объединитель затем выводит «сводку» только при очистке (это безопасно?). Итак, это резюме выглядит так:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

В редукторе я только суммирую значения для этих сводок. (Таким образом, выход редуктора выглядит так же, как выход объединителя).

Однако основная проблема заключается в том, что мне нужно рассматривать оба исходных файла как один виртуальный файл, который выдает записи вида (line, sourceId) // sourceId 0 или 1

И я не уверен, как этого добиться. Поэтому вопрос заключается в том, могу ли я избежать предварительной обработки и слияния файлов заранее, и сделать это на лету с помощью чего-то вроде виртуально слитного считывателя файлов и специального считывателя записей. Любой пример кода высоко ценится.

С уважением, Клаус

Листинг 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}

1 Ответ

2 голосов
/ 24 июня 2011

Хорошо, я должен признать, что я не совсем понял суть того, что вы пробовали до сих пор, но у меня есть простой подход, чтобы сделать то, что вам может понадобиться.

Посмотрите на файл-картограф. Этот получит имя файла и отправит его с каждой строкой ввода.

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

Теперь у нас есть набор ключей / значений, которые выглядят так (в отношении вашего примера)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

Очевидно, что уменьшение их даст вам такой ввод:

    a File 1,File 2
    b File 1
    c File 1
    d File 2

То, что вам нужно сделать в вашем редукторе, может выглядеть так:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

Вы должны заменить строку внутри оператора if на ваши имена файлов.

Я думаю, что использование счетчика заданий немного понятнее, чем использование собственных примитивов и их запись в контекст при очистке. Вы можете получить счетчики для задания, вызвав этот материал после завершения:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

Тем не менее, если вам нужно общее количество строк и т. Д. В вашей HDFS, тогда перейдите к вашему решению.

Надеюсь, это помогло вам.

...