Как десериализовать блоки байтового массива без объединения в один большой массив - PullRequest
1 голос
/ 21 марта 2012

У меня есть клиент и сервер, обменивающиеся данными через удаленное взаимодействие Spring (с использованием сериализации Java) через проприетарную систему обмена сообщениями. Мой сервер возвращает большие объекты, поэтому моя реализация удаленного взаимодействия Spring разбивает сериализованный байтовый массив объектов на блоки и отправляет несколько сообщений. Клиент ожидает все ответные сообщения для данного запроса и в конечном итоге вызывает метод ниже для десериализации массивов байтов в результирующий объект.

protected Object deserialize(List<byte[]> blocks) {
    try {
        ByteArrayOutputStream os = new ByteArrayOutputStream(blocks.size() * blockSize);
        for (byte[] b : blocks) {
            os.write(b, 0, b.length);
        }
        ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
        ObjectInputStream objInputStream = new ObjectInputStream(is);
        return objInputStream.readObject();
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

Это работает отлично. Однако память очень тяжелая. Предполагая, что объект в памяти имеет примерно такой же размер, как его сериализованный байтовый массив в памяти, я получаю в 3 раза больше размера моего объекта в памяти:

  1. List<byte[]>, содержащий блоки
  2. ByteArrayOutputStream, содержащий каскадный байтовый массив (и, возможно, другой, потому что ByteArrayOutputStream.toByteArray() копирует массив).
  3. Полученный Объект

Как только этот метод возвращает все массивы могут быть GC'd, но во время этого вызова метода есть большой всплеск использования памяти.

Итак, на мой вопрос: есть ли способ, которым я могу создать входной поток блокирующих байтов, к которому я могу добавлять байтовые массивы, когда я их получаю? ObjectOutputStream (в отдельном потоке) считывает доступные байты, затем блокирует их до тех пор, пока не будет записано больше байтов, и продолжит работу до полной десериализации объекта. Таким образом, мне никогда не нужно иметь полный конкатенированный байтовый массив в памяти. Кажется, что ни одна из стандартных реализаций потока не подходит, я не понимаю, как бы я использовал NIO для этого, и я бы не стал писать свою собственную реализацию потока, если бы там было достаточно.

Большое спасибо, Ian

Ответы [ 2 ]

2 голосов
/ 21 марта 2012

реализовать свой собственный входной поток для уменьшения накладных расходов массива

protected Object deserialize(final List<byte[]> blocks) {
    try {
       ObjectInputStream objInputStream = new ObjectInputStream(InputStream(){
            Iterator<byte[]> it=blocks.iterator();
            byte[] curr;
            int ind;
            public int read(){
                if(curr==null||curr.length==ind){
                    if(!it.hasNext())return -1;//or use a blocking queue and pop
                    curr=it.next();
                    ind=0;
                }
                return curr[ind++];
            }
        });
        return objInputStream.readObject();
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

Конечно, вы также должны переопределить read(byte[],int,int) также для эффективности, но это будет работать, если немного медленно

Или Вы можете использовать комбо PipedInputStream и PipedOutputStream для того, что вы действительно хотите. Входной поток будет блокироваться до тех пор, пока ему не будет что прочитать

0 голосов
/ 22 марта 2012

Просто для полноты ниже приведена моя новая (тестовая) реализация клиента, который десериализует блоки с сервера по мере их поступления в объект, используя потоки Piped, как предложено @rachetfreak.Спасибо!

public static class Client implements Runnable {
    private final PipedInputStream deserializationInputStream = new PipedInputStream(BLOCK_SIZE);
    private final PipedOutputStream deserializationOutputStream;

    public Client() throws IOException {
        deserializationOutputStream = new PipedOutputStream(deserializationInputStream);
    }

    /** Called by messaging system when a message is received */
    public void onReceive(byte[] block) throws Exception {
        deserializationOutputStream.write(block);
    }

    public Object readObject() throws Exception {
        ObjectInputStream objectInputStream = new ObjectInputStream(deserializationInputStream);
        Object readObject = objectInputStream.readObject();
        objectInputStream.close();
        return readObject;
    }

    @Override
    public void run() {
        try {
            Object readObject = readObject();
            System.out.println("read: " + readObject);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...