НИО Труба выбрасывает «Сломанная труба», когда я пишу в мойку без причины! Как отлаживать? - PullRequest
0 голосов
/ 25 марта 2012

Мне кажется, я все сделал правильно. Я создаю канал, передаю приемник потоку писателя, регистрирую источник на моем селекторе с помощью OP_READ, запускаю мой селектор. Все работает, но как только я что-то пишу в мойку, я получаю исключение из-за разбитой трубы. Зачем !!!??? Здесь нет сломанной трубы. Я расстроен. Как я могу отладить / понять, что здесь происходит? У кого-нибудь есть простой пример канала, который я могу запустить, чтобы проверить, работает ли он. Нить, пишущая на раковине, и селектор, читающий ее.

РЕДАКТИРОВАТЬ: Я в значительной степени следовал предложению здесь . Трудно найти конкретные примеры труб NIO в Интернете.

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class SystemOutPipe extends Thread {

  public static void main(String[] args)
  {
    try {
      SystemOutPipe sop = new SystemOutPipe();
      sop.start();
      System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ...");
      Thread.sleep(5000L);
      sop.setStopped(true);
      sop.join();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private Selector selector;
  private Pipe pipe;
  private boolean stopped = false;

  public SystemOutPipe() throws IOException {
    super("SystemOutPipe");
    pipe = Pipe.open();
    System.setOut(new PrintStream(new PipeOutputStream(pipe)));
    selector = Selector.open();
    pipe.source().configureBlocking(false);
    pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
  }

  @Override
  public void run() {
    try {
      while (!isStopped()) {
        int n = selector.select(1L);
        if (n > 0) {
          Iterator<SelectionKey> it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            SelectionKey key = it.next();
            it.remove();
            if (key.isReadable()) {
              new ReadHandler(key).run();
            }
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace(); // writes to System.err !
    }
  }

  public synchronized boolean isStopped() {
    return stopped;
  }

  public synchronized void setStopped(final boolean stopped) {
    this.stopped = stopped;
  }

  public class ReadHandler implements Runnable {
    private final SelectionKey key;

    public ReadHandler(final SelectionKey key) {
      this.key = key;
    }

    @Override
    public void run() {
      ByteBuffer bbuf = (ByteBuffer) key.attachment();
      ReadableByteChannel channel = (ReadableByteChannel) key.channel();
      try
      {
        int count = 0;
        do {
          bbuf.clear();
          count = channel.read(bbuf);
          if (count > 0) System.err.write(bbuf.array(), 0, count);
        } while(count > 0);
      } catch (IOException e) {
        e.printStackTrace();
        key.cancel();
      }
    }
  }

  public class PipeOutputStream extends OutputStream {
    private final Pipe pipe;

    public PipeOutputStream(final Pipe pipe) {
      this.pipe = pipe;
    }

    @Override
    public void write(final int b) throws IOException {
      write(new byte[] { (byte) b });
    }

    @Override
    public void write(final byte[] b) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(final byte[] b, final int off, final int len) throws IOException {
      ByteBuffer bbuf = ByteBuffer.wrap(b, off, len);
      bbuf.position(len);
      bbuf.flip();
      int count = 0;
      while (count < len) {
        int n = pipe.sink().write(bbuf);
        if (n == 0) {
          // let's wait a bit and not consume cpu
          try {
            Thread.sleep(1L);
          } catch (InterruptedException e) {
            throw new IOException(e);
          }
        }
        else count += n;
      }
    }
  }
}

ИСКЛЮЧЕНИЕ:

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcher.write0(Native Method)
    at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
    at sun.nio.ch.IOUtil.write(IOUtil.java:43)
    at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149)
    at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method)
    at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source)

1 Ответ

0 голосов
/ 25 марта 2012

Хорошо, я нашел проблему.Во-первых, спасибо всем, кто пытается помочь.Надеюсь, вы узнаете из моей ошибки.Цепочка событий была:

1 - я не опустошал приемный буфер (тот, в который читает исходный канал), и он в итоге заполнился.

2 - Теперь, когда он полон,pipeSourceChannel.read (readBuffer) возвращает 0 байтов.Есть данные для чтения, но они не могут читать в полном буфере.

3 - Это привело к закрытию канала (я сам делал это в bytesRead == 0) и BrokenPipe.

Один урок, который я усвоил здесь: ТРУБЫ хитрые.Я бы подумал, что неблокирующие параллельные очереди гораздо проще использовать, как этот парень здесь однажды упомянул: Java NIO Pipe vs BlockingQueue

...