Могу ли я вызвать XMPPConnection.sendPacket из параллельных потоков? - PullRequest
5 голосов
/ 18 сентября 2009

Мотивация

Мне нужны дополнительные глаза, чтобы подтвердить, что я могу вызвать этот метод XMPPConnection.sendPacket ( Пакет) одновременно. Для моего текущего кода я вызываю Список вызываемых (максимум 3) последовательным способом. Каждый вызываемый объект отправляет / получает пакеты XMPP на одном фрагменте XMPPConnection. Я планирую распараллелить эти Callables путем выделения нескольких потоков, и каждый Callable будет вызывать sendPacket для общего XMPPConnection без синхронизации.

XMPPConnection

class XMPPConnection
{
    private boolean connected = false;

    public boolean isConnected() 
    {
        return connected;
    }

    PacketWriter packetWriter;

    public void sendPacket( Packet packet ) 
    {
        if (!isConnected())
            throw new IllegalStateException("Not connected to server.");

        if (packet == null) 
            throw new NullPointerException("Packet is null.");

        packetWriter.sendPacket(packet);
    }
}

PacketWriter

class PacketWriter
{
    public void sendPacket(Packet packet) 
    {
        if (!done) {
            // Invoke interceptors for the new packet 
            // that is about to be sent. Interceptors
            // may modify the content of the packet.
            processInterceptors(packet);

            try {
                queue.put(packet);
            }
            catch (InterruptedException ie) {
                ie.printStackTrace();
                return;
            }
            synchronized (queue) {
                queue.notifyAll();
            }

            // Process packet writer listeners. Note that we're 
            // using the sending thread so it's expected that 
            // listeners are fast.
            processListeners(packet);
    }

    protected PacketWriter( XMPPConnection connection ) 
    {
        this.queue = new ArrayBlockingQueue<Packet>(500, true);
        this.connection = connection;
        init();
    }
}

Что я делаю вывод

Поскольку PacketWriter использует BlockingQueue, у меня нет проблем с намерением вызвать sendPacket из нескольких потоков. Я прав?

Ответы [ 3 ]

2 голосов
/ 26 мая 2011

Да, вы можете отправлять пакеты из разных потоков без проблем.

Очередь блокировки Smack заключается в том, что вы не можете позволить разным потокам одновременно записывать выходной поток. Smack берет на себя ответственность за синхронизацию выходного потока, записывая его с гранулярностью каждого пакета.

Шаблон, реализованный в Smack, является просто типичным шаблоном параллелизма производителя / потребителя. У вас может быть несколько производителей (ваших потоков) и только один потребитель (PacketWriter Smack работает в своем собственном потоке).

Привет.

0 голосов
/ 15 декабря 2009

Вы можете рассмотреть возможность использования BlockingQueue, если вы можете ограничиться Java 5+.

Из документации по API Java с небольшим изменением использования ArrayBlockingQueue:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new ArrayBlockingQueue();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

Для вашего использования ваш реальный отправитель (владелец фактического соединения) будет потребителем, а составители / отправители пакетов - производителями.

Интересной дополнительной мыслью является то, что вы могли бы использовать PriorityBlockingQueue, чтобы разрешить флэш-переопределение пакетов XMPP, отправляемых перед любыми другими ожидающими пакетами.

Кроме того, очки Глена на дизайне - хорошие моменты. Возможно, вы захотите взглянуть на Smack API (http://www.igniterealtime.org/projects/smack/), а не создавать свой собственный.

0 голосов
/ 22 сентября 2009

Вы не предоставили достаточно информации здесь.

Мы не знаем, как реализовано следующее:

  • processInterceptors
  • processListeners

Кто читает / пишет переменную done? Если один поток задает для него значение true, то все остальные потоки будут молча терпеть неудачу.

На первый взгляд, это не выглядит потокобезопасным, но нет никакого способа точно сказать, что вы написали.

Другие вопросы:

  • Почему PacketWriter является членом класса XMPPConnection, когда он используется только в одном методе?
  • Почему PacketWriter имеет член XMPPConnection var и не использует его?
...