Прерывание потока не прекращает блокирующий вызов при чтении входного потока - PullRequest
10 голосов
/ 02 октября 2010

Я использую RXTX для чтения данных с последовательного порта. Чтение выполняется в потоке, созданном следующим образом:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

Класс SerialReader реализует Runnable и просто зацикливается бесконечно, считывая данные из порта и собирая данные в полезные пакеты перед отправкой в ​​другие приложения. Однако я сократил его до следующей простоты:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

Когда пользователь нажимает кнопку остановки, запускается следующая функция, которая теоретически должна закрыть входной поток и выйти из блокирующего вызова byteChan.read (buffer). Код выглядит следующим образом:

public void stop() {
  t.interrupt();
  serial.close();
}

Однако, когда я запускаю этот код, я никогда не получаю исключение ClosedByInterruptException, которое ДОЛЖНО запускаться после закрытия входного потока. Кроме того, выполнение блокируется при вызове serial.close () - потому что основной входной поток все еще блокирует вызов read. Я попытался заменить вызов прерывания на byteChan.close (), который затем должен вызвать AsynchronousCloseException, однако я получаю те же результаты.

Любая помощь в том, что мне не хватает, будет принята с благодарностью.

Ответы [ 3 ]

7 голосов
/ 02 октября 2010

Вы не можете сделать поток, который не поддерживает прерываемый ввод / вывод, в InterruptibleChannel, просто обернув его (и, в любом случае, ReadableByteChannel не расширяет InterruptibleChannel).

Вы должны посмотреть на контракт базового InputStream.Что SerialPort.getInputStream() говорит о прерываемости его результата?Если он ничего не говорит, вы должны предположить, что он игнорирует прерывания.

Для любого ввода-вывода, который явно не поддерживает прерывание, единственная опция, как правило, закрывает поток из другого потока.Это может немедленно вызвать IOException (хотя это не может быть AsynchronousCloseException) в потоке, заблокированном при вызове в поток.

Однако даже это в значительной степени зависит от реализации InputStream, и базовая ОС также может быть фактором.


Обратите внимание на комментарий исходного кода к ReadableByteChannelImpl класс, возвращаемый newChannel():

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮
4 голосов
/ 13 октября 2010

RXTX SerialInputStream (что возвращается вызовом serial.getInputStream ()) поддерживает схему тайм-аута, которая в конечном итоге решает все мои проблемы. Добавление следующего перед созданием нового объекта SerialReader приводит к тому, что чтение больше не блокируется бесконечно:

serial.enableReceiveTimeout(1000);

В объекте SerialReader мне пришлось изменить несколько вещей для чтения непосредственно из InputStream вместо создания ReadableByteChannel, но теперь я могу без проблем остановить и перезапустить считыватель.

1 голос
/ 30 ноября 2010

Я использую код ниже, чтобы выключить rxtx.Я запускаю тесты, которые запускают их и выключают, и кажется, что все в порядке.мой читатель выглядит так:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

спасибо

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }
...