Closed Channel Exception, копирующий локальную файловую систему в HDFS - PullRequest
0 голосов
/ 28 сентября 2019

Я пытаюсь скопировать все файлы в каталоге в локальной файловой системе в папку Hadoop.Я запускаю установку с одним узлом на моей домашней машине с виртуальной машиной Ubuntu.Это многопоточное приложение (ThreadPoolExecutor), но я не думаю, что это вызывает проблему, с которой я сталкиваюсь.Когда я запускаю его с помощью команды hdfs.close (), я получаю исключения из нескольких закрытых каналов.Я как-то неправильно закрываю каналы?

И еще одна проблема, с которой я сталкиваюсь, заключается в том, что эти файлы вообще не сжимаются.Выходные файлы немного больше, чем оригиналы.Файлы - это просто случайные байты, сгенерированные из CLI с использованием dd.Эти типы файлов не сжимаются?

    public void run(){

        for (int i = 0; i< f.length; i++){
            FileSystem hdfs = null;
            FileInputStream is = null;
            OutputStream os = null;
            CompressionCodec compCodec = null;
            CompressionOutputStream compOs = null;

            try{
                String fname = f[i].getName();
                hdfs = FileSystem.get(URI.create(dest), config);
                is = new FileInputStream(f[i].getAbsolutePath());
                //InputStream is = new BufferedInputStream(fin);
                os = hdfs.create(new Path(dest + '/' + fname), true);
                System.out.println(dest+ '/' + fname);
                compCodec = new GzipCodec();                
                compOs = compCodec.createOutputStream(os);
                IOUtils.copyBytes(is, compOs, config);                

            }catch(IOException e){
                e.printStackTrace();
            }
            finally{
                try{
                    compOs.finish();
                    IOUtils.closeStream(is);
                    IOUtils.closeStream(compOs);
                    is.close();
                    os.close();
                    //hdfs.close();
                }catch(IOException e){
                    e.printStackTrace();
                }
            }
        }

Редактировать: Трассировка стека

java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.ExceptionLastSeen.throwException4Close(ExceptionLastSeen.java:73)
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:153)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
        at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
        at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
        at org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream$ResetableGZIPOutputStream.write(GzipCodec.java:77)
        at org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.write(GzipCodec.java:118)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:96)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:68)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:114)
        at edu.jhu.bdpuh.hdfs.ThreadedCopy.run(ThreadedCopy.java:49)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
...