Буферизованные фоновые реализации InputStream - PullRequest
8 голосов
/ 28 января 2010

Я написал фоновые InputStreamOutputStream) реализации, которые обертывают другие потоки и читают вперед в фоновом потоке, в первую очередь позволяя распаковке / сжатию происходить в разных потоках от обработки распакованного потока.

Это довольно стандартная модель производителя / потребителя.

Это кажется простым способом эффективно использовать многоядерные процессоры с простыми процессами, которые читают, обрабатывают и записывают данные, позволяяболее эффективное использование ресурсов процессора и диска.Возможно, слово «эффективный» - не лучшее слово, но оно обеспечивает более высокое использование и, что меня больше интересует, сокращение времени выполнения по сравнению с чтением непосредственно из ZipInputStream и записью непосредственно в ZipOutputStream.

Я рад опубликовать код, но мой вопрос заключается в том, переизобретаю ли я что-то легкодоступное в существующих (и более загруженных) библиотеках?

Редактирование - публикациякод ...

Мой код для BackgroundInputStream находится ниже (BackgroundOutputStream очень похож), но есть некоторые аспекты, которые я хотел бы улучшить.

  1. Похоже, я слишком много работаю, чтобы передавать буферы назад и вперед.
  2. Если вызывающий код выбрасывает ссылки на BackgroundInputStream, backgroundReaderThread будет зависать вечно.
  3. Сигнализация eof нуждается в улучшении.
  4. Исключения следует распространять на поток переднего плана.
  5. Я хотел бы разрешить использование потока из предоставленного Executor.
  6. Метод close() должен сигнализировать о фоновом потокеи не должен закрывать завернутый поток, так как завернутый поток должен принадлежать фоновому потоку, который читает из него.
  7. Делать глупые вещи, такие как чтение после закрытия, нужно соответствующим образом.

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

Ответы [ 2 ]

3 голосов
/ 28 января 2010

Звучит интересно. Я никогда не сталкивался с чем-либо, что делает это «из коробки», но имеет смысл попробовать использовать простое ядро ​​для сжатия, если оно доступно.

Возможно, вы могли бы использовать Commons I / O - это хорошо протестированная библиотека, которая может помочь справиться с некоторыми из более скучных вещей и позволить вам сосредоточиться на расширении классных параллельных частей. Может быть, вы даже могли бы внести свой код в проект Commons; -)

0 голосов
/ 02 февраля 2010

Мне было бы интересно. Я продумал похожий проект, но не смог понять, как обрабатывать фрагменты, которые заканчивают сжатие не по порядку.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...