/ 25 марта 2019

Я пишу программу Mapreduce для обработки изображений Dicom.Целью этой программы Mapreduce является обработка изображения dicom, извлечение из него метаданных, индексирование в solr и, наконец, на этапе Reducer сохранение исходного изображения в формате hdf.Я хочу сохранить тот же файл в HDFS, что и выход редуктора

Таким образом, я достиг большей части функциональности, но в фазе редуктора при сохранении того же файла в hdfs он не работает.

Я протестировал обработанный файл Dicom с помощью средства просмотра изображений dicom, и он говорит, что файл обрезан, а также размер увеличенного файла dicom немного увеличен. Пример. Исходный размер Dicom составляет 628 КБ, а когда редуктор сохраняет этот файл в формате hdf, его размер изменяется на 630 КБ.

Я пробовал решение по этим ссылкам, но ни одна из них не дает ожидаемых результатов.

Hadoop mapReduce Как сохранить только значения в HDFS

Hadoop - Как собрать вывод текста без значений

Воткод для чтения файла Dicom как одного файла (без разбиения его).

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{

    protected boolean isSplitable(JobContext context, Path filename) {
        return false;

    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;

Custom RecordReader

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {     
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();     

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            System.out.println("Inside nextKeyvalue");
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                processed = true;
                return true;
            return false;

    public void close() throws IOException {


    public NullWritable getCurrentKey() throws IOException, InterruptedException 
        return NullWritable.get();

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;

    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;


Mapper Class TheКласс картографа работает идеально в соответствии с нашими потребностями.

public class MapClass{

    public static class Map extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{   

        protected void map(NullWritable key, BytesWritable value,
                Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            InputStream in = new ByteArrayInputStream(value.getBytes());            
            ProcessDicom.metadata(in); // Process dicom image and extract metadata from it
            Text keyOut = getFileName(context);
            context.write(keyOut, value);


        private Text getFileName(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
            InputSplit spl = context.getInputSplit();
            Path filePath = ((FileSplit)spl).getPath();
            String fileName = filePath.getName();
            Text text = new Text(fileName);
            return text;

        protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                throws IOException, InterruptedException {


Класс редуктора Это класс редуктора.Открытый класс ReduceClass {

    public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, BytesWritable>{

            protected void reduce(Text key, Iterable<BytesWritable> value,
                    Reducer<Text, BytesWritable, BytesWritable, BytesWritable>.Context context)
                    throws IOException, InterruptedException {

            Iterator<BytesWritable> itr = value.iterator();
                BytesWritable wr = itr.next();
                context.write(new BytesWritable(key.copyBytes()), itr.next());

Основной класс

public class DicomIndexer{

    public static void main(String[] argss) throws Exception{
        String args[] = {"file:///home/b3ds/storage/dd","hdfs://"};

    public static void run(String[] args) throws Exception {

        //Initialize the Hadoop job and set the jar as well as the name of the Job
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WordCount");
//      job.getConfiguration().set("mapreduce.output.basename", "hi");


        WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));




Так что я совершенно не знаю, что делать.Некоторые из ссылок говорят, что это невозможно, так как Mapreduce работает на пару, а некоторые говорят, что нужно использовать NullWritable.До сих пор я пробовал NullWritable, SequenceFileOutputFormat, но ни один из них не работает.

/ 25 марта 2019

Две вещи:

  1. Вы непреднамеренно потребляете два элемента за раз в редукторе, дважды вызывая itr.next(), что не может помочь.

  2. Как вы определили, вы пишете ключ и значение, когда хотите написать только один. Вместо этого используйте NullWritable для значения. Ваш редуктор будет выглядеть так:

    public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, NullWritable>{
        protected void reduce(Text key, Iterable<BytesWritable> value,
                              Reducer<Text, BytesWritable, BytesWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            NullWritable nullWritable = NullWritable.get();
            Iterator<BytesWritable> itr = value.iterator();
                BytesWritable wr = itr.next();
                context.write(wr, nullWritable);