Я немного изменил код, чтобы уменьшить создание объекта.В последней версии 5.17.1 я могу многократно перезапускать производителя, а потребитель продолжает читать данные.
ПРИМЕЧАНИЕ. Если вы собираетесь писать текст, лучше использовать writeText
метод.
Если вы хотите написать что-то более сложное, я предлагаю использовать Wire или каждый MethodReader / MethodWriters, которые позволяют вам выполнять вызовы метода интерфейса.
package net.openhft.chronicle.queue;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import java.nio.ByteBuffer;
public class IpcTest {
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
Bytes<ByteBuffer> bytes = Bytes.elasticByteBuffer(8192);
ByteBuffer ipcBuffer = bytes.underlyingObject();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
bytes.readPositionRemaining(0, ipcBuffer.position());
appender.writeBytes(bytes);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(8192);
Pauser pauser = Pauser.balanced();
while (true) {
try {
long ipcIndex = tailer.index();
bytes.clear();
boolean read = tailer.readBytes(bytes);
if (read) {
byte[] data = bytes.underlyingObject().array();
int len = (int) bytes.readRemaining();
System.out.println("read " + new String(data, 0, 0, len));
pauser.reset();
} else {
pauser.pause();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
Using MethodReader / MethodWriter
public class IpcTest {
interface Hello {
void hello(String text);
}
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
hello.hello("data" + i);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
Вы можете использовать DTO со значением AbstractMarshallable
, чтобы сделать его эффективным для сериализации и десериализации.
package net.openhft.chronicle.queue;
import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractMarshallable;
public class IpcTest {
static class Hi extends AbstractMarshallable {
String text;
int value;
}
interface Hello {
void hi(Hi hi);
}
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);
Hi hi = new Hi();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
hi.text = "data";
hi.value = i;
hello.hi(hi);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}
public static void main(final String[] args) {
ClassAliasPool.CLASS_ALIASES.addAlias(Hi.class);
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
В этом случае потребитель печатает
....
read !Hi {
text: data,
value: 3862
}
read !Hi {
text: data,
value: 3863
}
read !Hi {
text: data,
value: 3864
}
....