Я использую следующие подходы. (Мой подход JOIN очень похож, но этот метод не копирует поведение diff с реплицированными строками). Как это было задано некоторое время назад, возможно, вы использовали только один редуктор, так как Pig получил алгоритм для регулировки числа редукторов в 0,8?
- Оба подхода, которые я использую, работают с точностью до нескольких процентов друг от друга, но не обрабатывают дубликаты одинаково
- Подход JOIN сворачивает дубликаты (поэтому, если один файл имеет больше дубликатов, чем другой, этот подход не будет выводить дубликат)
- Подход UNION работает как инструмент Unix
diff
(1) и возвращает правильное количество дополнительных дубликатов для правильного файла
- В отличие от инструмента Unix
diff
(1), порядок не важен (эффективный подход JOIN выполняет sort -u <foo.txt> | diff
, в то время как UNION выполняет sort <foo> | diff)
- Если у вас невероятное количество (~ тысяч) повторяющихся строк, то все замедляется из-за объединений (если позволяет использование, сначала выполните DISTINCT для необработанных данных)
- Если ваши строки очень длинные (например, размером> 1 КБ), то рекомендуется использовать DataFu MD5 UDF и использовать только разницу по хешам, а затем СОЕДИНИТЬ с исходными файлами, чтобы получить исходную строку назад перед выводом
Использование JOIN:
SET job.name 'Diff(1) Via Join'
-- Erase Outputs
rmf first_only
rmf second_only
-- Process Inputs
a = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS First: chararray;
b = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Second: chararray;
-- Combine Data
combined = JOIN a BY First FULL OUTER, b BY Second;
-- Output Data
SPLIT combined INTO first_raw IF Second IS NULL,
second_raw IF First IS NULL;
first_only = FOREACH first_raw GENERATE First;
second_only = FOREACH second_raw GENERATE Second;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();
Использование UNION:
SET job.name 'Diff(1)'
-- Erase Outputs
rmf first_only
rmf second_only
-- Process Inputs
a_raw = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
b_raw = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
a_tagged = FOREACH a_raw GENERATE Row, (int)1 AS File;
b_tagged = FOREACH b_raw GENERATE Row, (int)2 AS File;
-- Combine Data
combined = UNION a_tagged, b_tagged;
c_group = GROUP combined BY Row;
-- Find Unique Lines
%declare NULL_BAG 'TOBAG(((chararray)\'place_holder\',(int)0))'
counts = FOREACH c_group {
firsts = FILTER combined BY File == 1;
seconds = FILTER combined BY File == 2;
GENERATE
FLATTEN(
(COUNT(firsts) - COUNT(seconds) == (long)0 ? $NULL_BAG :
(COUNT(firsts) - COUNT(seconds) > 0 ?
TOP((int)(COUNT(firsts) - COUNT(seconds)), 0, firsts) :
TOP((int)(COUNT(seconds) - COUNT(firsts)), 0, seconds))
)
) AS (Row, File); };
-- Output Data
SPLIT counts INTO first_only_raw IF File == 1,
second_only_raw IF File == 2;
first_only = FOREACH first_only_raw GENERATE Row;
second_only = FOREACH second_only_raw GENERATE Row;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();
Производительность
- Разница между 200 ГБ (1 055 687 930 строк) при использовании сжатого ввода LZO с 18 узлами занимает примерно 10 минут.
- Каждый подход занимает только один цикл Map / Reduce.
- В результате получается примерно 1,8 Гбайт на узел в минуту (не большая пропускная способность, но в моей системе кажется, что
diff
(1) работает только в памяти, в то время как Hadoop использует потоковые диски.