hadoop-streaming: запись вывода в разные файлы - PullRequest
0 голосов
/ 10 октября 2011

Вот сценарий

           Reducer1  
         /  
Mapper - - Reducer2  
         \   
           ReducerN  

В редукторе я хочу записать данные в разные файлы, скажем, редуктор выглядит так:

def reduce():  
  for line in sys.STDIN:  
    if(line == type1):
      create_type_1_file(line)
    if(line == type2):
      create_type_2_file(line)
    if(line == type3):
      create_type3_file(line)
      ... and so on  
def create_type_1_file(line):
  # writes to file1  
def create_type2_file(line):
  # writes to file2  
def create_type_3_file(line):
  # write to file 3  

рассмотрим пути для записи как:

file1 = /home/user/data/file1  
file2 = /home/user/data/file2  
file3 = /home/user/data/file3  

Когда я запускаю в pseudo-distributed mode(machine with one node and hdfs daemons running), все хорошо, поскольку все демоны будут записывать в один и тот же набор файлов

Вопрос: - Если я запустлю это в кластере из 1000 машин,будут ли они писать в тот же набор файлов даже тогда?Я writing to local filesystem в этом случае
- есть ли лучший способ выполнить эту операцию в hadoop streaming?

Спасибо

Ответы [ 2 ]

0 голосов
/ 06 июля 2016

Вывод может быть записан из Редуктора в более чем одно местоположение с использованием класса MultipleOutputs. Вы можете рассматривать файл1, файл2 и файл3 как три папки и записывать выходные данные 1000 Редукторов в эти папки отдельно.


Шаблон использования для представления работы:

 Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);

//outDir is the root path, in this case, outDir="/home/user/data/"
 FileOutputFormat.setOutputPath(job, outDir);

//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 job.setMapperClass(MOMap.class);

 job.setReducerClass(MOReduce.class);

 ...

 job.waitForCompletion(true);

Использование в редукторе:

private MultipleOutputs out;

 public void setup(Context context) {

   out = new MultipleOutputs(context);

   ...

 }

 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
 for (Text line : values) {

    if(line == type1)
      out.write(key, new Text(line),"file1/part");

  else  if(line == type2)
      out.write(key, new Text(line),"file2/part");

 else   if(line == type3)
      out.write(key, new Text(line),"file3/part");
   }
 }

 protected void cleanup(Context context) throws IOException, InterruptedException {
       out.close();
   }

ref: https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

0 голосов
/ 11 октября 2011

Обычно O / P Reduce записывается в надежную систему хранения, такую ​​как HDFS, потому что, если один из узлов выходит из строя, тогда данные сокращения, связанные с этими узлами, теряются.Невозможно снова запустить эту конкретную задачу сокращения вне контекста платформы Hadoop.Кроме того, после завершения задания необходимо объединить операции ввода-вывода из 1000 узлов для различных типов ввода.

Параллельная запись не поддерживается в HDFS.Может быть случай, когда несколько редукторов могут записывать в один и тот же файл в HDFS, и это может повредить файл.Когда несколько задач сокращения выполняются на одном узле, параллелизм может быть проблемой при записи также в один локальный файл.

Одним из решений является имя файла конкретной задачи .и позже объедините все файлы для определенного типа ввода.

...