Хроника очереди Tailer останавливается при перезапуске appender - PullRequest
0 голосов
/ 04 декабря 2018

В приведенном ниже коде, перезапуск процесса Tailer в порядке.Тем не менее, перезапуск процесса appender приводит к тому, что tailer не получает больше сообщений.Есть ли способ перезапустить приложение и оставить канал открытым?

Отредактировано: ниже приведен полный класс, который я использовал для постоянного воссоздания проблемы.Среда: Ubuntu 18ronicle-queue-5.16.9.jar

1) производитель java com.tradeplacer.util.IpcTest

2) потребитель java com.tradeplacer.util.IpcTest

3) убить производителя

4) перезапустить производителя

5) заметить, что потребитель больше ничего не читает

package com.tradeplacer.util;

import java.nio.ByteBuffer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;

public class IpcTest {
  private static final String DIR = "chronicle-test";

  public static final void startProducer() {
    new Thread() {
      public void run() {
    System.out.println("starting producer...");
    ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
    ExcerptAppender appender = queue.acquireAppender();
    ByteBuffer ipcBuffer = ByteBuffer.allocate(8192);

    for (int i = 0; i < Integer.MAX_VALUE; i++) {
      ipcBuffer.clear();
      ipcBuffer.put(("data" + i).getBytes());
      Bytes<ByteBuffer> bbb = Bytes.wrapForWrite(ipcBuffer);
      appender.writeBytes(bbb);
      try {
        Thread.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
      }
    }.start();
  }

  public static final void startConsumer() {
    new Thread() {
      public void run() {
    System.out.println("starting consumer...");
    ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
    ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
    Bytes bytes = Bytes.allocateDirect(8192);

    while (true) {
      try {
        long ipcIndex = tailer.index();
        boolean read = tailer.readBytes(bytes);
        int len = bytes.length();
        byte[] data = new byte[len];
        bytes.read(data);
        if (read) {
          System.out.println("read " + data);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }

      }
    }.start();
  }

  public static void main(final String[] args) {
    if ("producer".equals(args[0]))
      startProducer();
    else
      startConsumer();
  }
}

1 Ответ

0 голосов
/ 05 декабря 2018

Я немного изменил код, чтобы уменьшить создание объекта.В последней версии 5.17.1 я могу многократно перезапускать производителя, а потребитель продолжает читать данные.

ПРИМЕЧАНИЕ. Если вы собираетесь писать текст, лучше использовать writeText метод.

Если вы хотите написать что-то более сложное, я предлагаю использовать Wire или каждый MethodReader / MethodWriters, которые позволяют вам выполнять вызовы метода интерфейса.

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;

import java.nio.ByteBuffer;

public class IpcTest {
    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        ExcerptAppender appender = queue.acquireAppender();
        Bytes<ByteBuffer> bytes = Bytes.elasticByteBuffer(8192);
        ByteBuffer ipcBuffer = bytes.underlyingObject();

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            ipcBuffer.clear();
            ipcBuffer.put(("data" + i).getBytes());
            bytes.readPositionRemaining(0, ipcBuffer.position());
            appender.writeBytes(bytes);

            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
        Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(8192);
        Pauser pauser = Pauser.balanced();
        while (true) {
            try {
                long ipcIndex = tailer.index();
                bytes.clear();
                boolean read = tailer.readBytes(bytes);
                if (read) {
                    byte[] data = bytes.underlyingObject().array();
                    int len = (int) bytes.readRemaining();
                    System.out.println("read " + new String(data, 0, 0, len));
                    pauser.reset();
                } else {
                    pauser.pause();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(final String[] args) {
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

Using MethodReader / MethodWriter

public class IpcTest {

    interface Hello {
        void hello(String text);
    }

    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = queue.methodWriter(Hello.class);

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            hello.hello("data" + i);
            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = text -> System.out.println("read " + text);
        MethodReader reader = queue.createTailer().methodReader(hello);
        Pauser pauser = Pauser.balanced();
        while (true) {
            if (reader.readOne()) {
                pauser.reset();
            } else {
                pauser.pause();
            }
        }
    }

    public static void main(final String[] args) {
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

Вы можете использовать DTO со значением AbstractMarshallable, чтобы сделать его эффективным для сериализации и десериализации.

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractMarshallable;

public class IpcTest {

    static class Hi extends AbstractMarshallable {
        String text;
        int value;
    }

    interface Hello {
        void hi(Hi hi);
    }

    private static final String DIR = "chronicle-test";

    public static final void startProducer() {
        System.out.println("starting producer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = queue.methodWriter(Hello.class);
        Hi hi = new Hi();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            hi.text = "data";
            hi.value = i;
            hello.hi(hi);
            Jvm.pause(1);
        }
    }

    public static final void startConsumer() {
        System.out.println("starting consumer...");
        ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
        Hello hello = text -> System.out.println("read " + text);
        MethodReader reader = queue.createTailer().methodReader(hello);
        Pauser pauser = Pauser.balanced();
        while (true) {
            if (reader.readOne()) {
                pauser.reset();
            } else {
                pauser.pause();
            }
        }
    }

    public static void main(final String[] args) {
        ClassAliasPool.CLASS_ALIASES.addAlias(Hi.class);
        if ("producer".equals(args[0]))
            startProducer();
        else
            startConsumer();
    }
}

В этом случае потребитель печатает

....
read !Hi {
  text: data,
  value: 3862
}

read !Hi {
  text: data,
  value: 3863
}

read !Hi {
  text: data,
  value: 3864
}
....
...