Мне было написано много заданий mapreduce в java, это первый раз, когда я использую python write mapper only для выполнения в потоке jar mapreduce. Проблема, которую я обнаружил, заключается в том, что выходной файл из программы Mapper содержит дублированные записи. Я провел некоторый анализ и выяснил, что дублированные записи происходят из задач карты, выполняющихся на том же узле. Например, у меня есть 100 CSV-файлов в папке hdfs в качестве ввода моей работы mapreduce, работа выполняется на кластере из 5 узлов. Задание даст 100 заданий на карту в зависимости от количества CSV-файлов. Каждый узел получит 20 картографических заданий. Задание выдаст 100 выходных файлов в выходной каталог hdfs, который выглядит как "part-000xx". Каждая задача my mapper будет считывать в один файл csv (1000 записей) и выдает 3000 записей в качестве вывода. Поэтому я должен ожидать, что каждый выходной файл генерирует 3000 записей. Но вместо этого я нашел 20 * 3000 = 60000 записей в каждом файле. Кажется, что все задачи сопоставления, выполняющиеся на одном и том же узле, будут записывать в один и тот же файл этого файла hdfs, а также все выходные файлы на одном и том же узле будут получать записи от всех задач сопоставления, выполняющихся на одном и том же узле. Я думаю, что это поведение потокового задания mapreduce, с которым я раньше не сталкивался при использовании Java.
Я попытался распечатать (датафрейм) и распечатать (ключ, значение) для каждой строки. Оба будут производить дублированные записи
hadoop jar /usr/hdp/3.1.0.0-78/hadoop-mapreduce/hadoop-streaming.jar -files / home / xxxxxx / pythonfiles -Dmapreduce.job.queuename = queuename -Dyarn.scheduler.minimum-alloc- mb = 1024 -Dmapreduce.job.reduces = 0 -Dmapreduce.map.memory.mb = 4096 -Dmapreduce.map.java.opts = -Xmx3277m -Dmapreduce.task.timeout = 0 -input / user / uxxxxx / input -file /home/uxxxxx/mapper.py -mapper "/ apps / bin / python /home/uxxxxxx/mapper.py" -output / user / uxxxxxxx / mytest
Я ожидаю, что каждый выходной файл содержит 3000 записей из каждой задачи сопоставления. Есть ли какая-либо конфигурация, которую я могу установить, чтобы изменить это поведение? Или я должен ограничить число одновременных задач сопоставления, выполняющихся на одном узле?