Существует предположение, что вы будете иметь некоторое представление о том, как долго будут храниться данные, и прочитаете их за один раз.Также предполагается, что вы захотите повторно использовать буферы, чтобы избежать создания мусора.Для минимизации задержки данные обычно считываются в / из каналов NIO.
Я поднял проблему при создании этого примера. Улучшение поддержки Input / OutputStream и объектов, не относящихся к маршалу * https://github.com/OpenHFT/Chronicle-Wire/issues/111
Это должноДелайте то, что вы хотите, эффективно, не создавая мусор каждый раз.
package net.openhft.chronicle.wire;
import net.openhft.chronicle.bytes.Bytes;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
public class WireToOutputStream {
private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
private final Wire wire;
private final DataOutputStream dos;
public WireToOutputStream(WireType wireType, OutputStream os) {
wire = wireType.apply(bytes);
dos = new DataOutputStream(os);
}
public Wire getWire() {
wire.clear();
return wire;
}
public void flush() throws IOException {
int length = Math.toIntExact(bytes.readRemaining());
dos.writeInt(length);
dos.write(bytes.underlyingObject().array(), 0, length);
}
}
package net.openhft.chronicle.wire;
import net.openhft.chronicle.bytes.Bytes;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
public class InputStreamToWire {
private final Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(128);
private final Wire wire;
private final DataInputStream dis;
public InputStreamToWire(WireType wireType, InputStream is) {
wire = wireType.apply(bytes);
dis = new DataInputStream(is);
}
public Wire readOne() throws IOException {
wire.clear();
int length = dis.readInt();
if (length < 0) throw new StreamCorruptedException();
bytes.ensureCapacity(length);
byte[] array = bytes.underlyingObject().array();
dis.readFully(array, 0, length);
bytes.readPositionRemaining(0, length);
return wire;
}
}
Затем вы можете сделать следующее
package net.openhft.chronicle.wire;
import net.openhft.chronicle.core.util.ObjectUtils;
import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import static org.junit.Assert.assertEquals;
public class WireToOutputStreamTest {
@Test
public void testVisSocket() throws IOException {
ServerSocket ss = new ServerSocket(0);
Socket s = new Socket("localhost", ss.getLocalPort());
Socket s2 = ss.accept();
WireToOutputStream wtos = new WireToOutputStream(WireType.RAW, s.getOutputStream());
Wire wire = wtos.getWire();
AnObject ao = new AnObject();
ao.value = 12345;
ao.text = "Hello";
// write the type is needed.
wire.getValueOut().typeLiteral(AnObject.class);
Wires.writeMarshallable(ao, wire);
wtos.flush();
InputStreamToWire istw = new InputStreamToWire(WireType.RAW, s2.getInputStream());
Wire wire2 = istw.readOne();
Class type = wire2.getValueIn().typeLiteral();
Object ao2 = ObjectUtils.newInstance(type);
Wires.readMarshallable(ao2, wire2, true);
System.out.println(ao2);
ss.close();
s.close();
s2.close();
assertEquals(ao.toString(), ao2.toString());
}
public static class AnObject implements Serializable {
long value;
String text;
@Override
public String toString() {
return "AnObject{" +
"value=" + value +
", text='" + text + '\'' +
'}';
}
}
}
Пример кода
// On Sender side
Object m = ... ;
OutputStream out = ... ;
WireToOutputStream wireToOutputStream = new
WireToOutputStream(WireType.TEXT, out);
Wire wire = wireToOutputStream.getWire();
wire.getValueOut().typeLiteral(m.getClass());
Wires.writeMarshallable(m, wire);
wireToOutputStream.flush();
// On Receiver side
InputStream in = ... ;
InputStreamToWire inputStreamToWire = new InputStreamToWire(WireType.TEXT, in);
Wire wire2 = inputStreamToWire.readOne();
Class type = wire2.getValueIn().typeLiteral();
Object m = ObjectUtils.newInstance(type);
Wires.readMarshallable(m, wire2, true);
Этот код намного проще, если ваш DTO расширяет Marshallable
, но это будет работать независимо от того, расширяете ли вы интерфейс или нет.то есть вам не нужно расширять Serializable.
Кроме того, если вы знаете, какой тип будет вам, вам не нужно каждый раз писать его.
Добавленные классы-помощники были добавлены впоследний SNAPSHOT