Закройте сокет, когда все ожидающие сообщения были отправлены - PullRequest
1 голос
/ 09 ноября 2010

У меня есть этот метод для записи сообщений в сокет:

public void sendMessage(byte[] msgB) {
    try {
        synchronized (writeLock) {
            log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
            ous.write(HEADER_MSG);
            ous.writeInt(msgB.length);
            ous.write(msgB);
            ous.flush();
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

Теперь поток с именем Боб хотел бы закрыть сокет в какой-то неопределенный момент X , что означает, что все еще могут быть потоки, ожидающие writeLock, чтобы отправить свое сообщение, и даже может быть один поток в середине его написания.

Я могу решить последнее, разрешив Боб получает writeLock перед закрытием сокета, но я все еще могу потерять сообщения, которые еще не начали отправку, потому что, насколько я знаю, synchronized нечестно, Боб может получить блокировкудо какого-то другого потока, который ждал дольше.

Мне нужно, чтобы все вызовы, сделанные на sendMessage до X , выполняли свою работу нормально, а вызовы, сделанные после X выбросить ошибку.Как я могу это сделать?

  • Особенности: Bob - это чтение потока из входного потока сокета, а X - при получении сообщения "close"в этом потоке.

Ответы [ 3 ]

2 голосов
/ 09 ноября 2010

Рассмотрите возможность использования однопоточного ExecutorService для написания сообщений. Отправляющие потоки просто пытаются «отправить» свое сообщение, вызывая execute(Runnable) или submit(Callable). Если вы хотите прекратить отправку сообщений, вы выключаете ExecutorService (shutdown()), в результате чего последующие вызовы для отправки / выполнения приводят к RejectedExecutionException.

Преимущество этого подхода состоит в том, что у вас есть только один поток, связанный с вводом-выводом, и меньше конфликтов блокировки, чем если бы у вас было несколько потоков, ожидающих написания самих сообщений. Это также лучшее разделение интересов.

Вот краткий пример того, что ОО-проблема решается немного подробнее:

public interface Message {
  /**
   * Writes the message to the specified stream.
   */
  void writeTo(OutputStream os);
}

public class Dispatcher {
  private final ExecutorService sender;
  private final OutputStream out;

  public Dispatcher() {
    this.sender = Executors.newSingleThreadExecutor();
    this.out = ...; // Set up output stream.
  }

  /**
   * Enqueue message to be sent.  Return a Future to allow calling thread
   * to perform a blocking get() if they wish to perform a synchronous send.
   */
  public Future<?> sendMessage(final Message msg) {
    return sender.submit(new Callable<Void>() {
      public Void call() throws Exception {
        msg.writeTo(out);
        return null;
      }
    });
  }

  public void shutDown() {
    sender.shutdown(); // Waits for all tasks to finish sending.

    // Close quietly, swallow exception.
    try {
      out.close();
    } catch (IOException ex) {
    }
  }
}
2 голосов
/ 09 ноября 2010

Вы можете использовать исполнителя здесь. Поскольку каждое отправляемое сообщение синхронизировано (я полагаю, на общедоступном объекте), вы можете использовать ограничение потока.

static final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendMessage(byte[] msgB) {
    executor.submit(new Runnable() {
        public void run() {
            try {
                log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB));
                ous.write(HEADER_MSG);
                ous.writeInt(msgB.length);
                ous.write(msgB);
                ous.flush();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    });
}
public static void atMomentX(){
    executor.shutdown();
}

По завершении другой поток может вызвать atMomentX ();

Из javadoc метод выключения говорит:

Инициирует упорядоченное отключение, при котором ранее представленные задачи выполнено, но новых задач не будет принято. Призыв не имеет дополнительный эффект, если он уже закрыт вниз.

1 голос
/ 09 ноября 2010

Полагаю, я мог бы заменить синхронизированный блок на ReentrantLock , установленный как справедливый.

...