Извините за перекрестную публикацию этого сообщения в списке рассылки пользователей hadoop и здесь, но это становится для меня неотложным вопросом.
Моя проблема заключается в следующем:
У меня есть два входных файла, и я хочу определить
- а) Количество строк, которые встречаются только в файле 1
- б) Количество строк, которые встречаются только в файле 2
- c) Количество линий, общих для обеих (например, в отношении равенства строк)
Пример:
File 1:
a
b
c
File 2:
a
d
Желаемый вывод для каждого случая:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
В основном мой подход заключается в следующем:
Я написал свой собственный LineRecordReader, чтобы маппер получал пару, состоящую из строки (текста) и байта, указывающего исходный файл (0 или 1).
Картограф снова возвращает пару, так что на самом деле он ничего не делает.
Тем не менее, побочным эффектом является то, что комбинатор получает
Map<Line, Iterable<SourceId>>
(где SourceId равен 0 или 1).
Теперь для каждой строки я могу получить набор источников, в которых она появляется. Поэтому я мог бы написать сумматор, который подсчитывает для каждого случая (a, b, c) количество строк (Листинг 1)
Объединитель затем выводит «сводку» только при очистке (это безопасно?).
Итак, это резюме выглядит так:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
В редукторе я только суммирую значения для этих сводок. (Таким образом, выход редуктора выглядит так же, как выход объединителя).
Однако основная проблема заключается в том, что мне нужно рассматривать оба исходных файла как один виртуальный файл, который выдает записи вида
(line, sourceId) // sourceId 0 или 1
И я не уверен, как этого добиться.
Поэтому вопрос заключается в том, могу ли я избежать предварительной обработки и слияния файлов заранее, и сделать это на лету с помощью чего-то вроде виртуально слитного считывателя файлов и специального считывателя записей.
Любой пример кода высоко ценится.
С уважением,
Клаус
Листинг 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}