Как я должен обрабатывать многопоточность в Java? - PullRequest
0 голосов
/ 26 апреля 2009

Я работаю над практическим сценарием, связанным с Java, программой для сокетов. Существующая система и ожидаемая система следующие.

Существующая система - Система проверяет выполнение определенного условия. Если так, то это создаст какое-то сообщение для отправки и поместит его в очередь.

Процессор очереди является отдельным потоком. Периодически проверяется очередь на наличие предметов в ней. Если найдены какие-либо элементы (сообщения), он просто отправляет сообщение на удаленный хост (жестко закодировано) и удаляет элемент из очереди.

Ожидаемая система - Это что-то вроде этого. Сообщение создается, когда выполняется определенное условие, но в каждом случае получатель не совпадает. Так что есть много подходов.

  1. помещение сообщения в ту же очередь, но с идентификатором получателя. В этом случае 2-й поток может идентифицировать получателя, чтобы на него можно было отправить сообщение.

  2. Наличие нескольких потоков. В этом случае, когда условие выполнено и если получатель находится в состоянии «Новый», он создает новую очередь и помещает сообщение в эту очередь. И новый поток инициализируется для обработки этой очереди. Если последующие сообщения будут направлены тому же получателю, они должны быть помещены в ту же очередь, а если нет, то новая очередь и поток должны быть созданы.

Теперь я хочу реализовать 2-й, немного застрял. Как мне это сделать? Скелета было бы достаточно, и вам не нужно беспокоиться, чтобы узнать, как создавать очереди и т. Д.:)

Обновление: я также думаю, что подход 1 - лучший способ сделать это. Я прочитал несколько статей о потоках и пришел к этому решению. Но действительно стоит узнать, как реализовать подход 2.

Ответы [ 5 ]

4 голосов
/ 26 апреля 2009

Рассмотрите возможность использования Java Message Services (JMS) вместо повторного изобретения колеса?

2 голосов
/ 26 апреля 2009

Вы можете попробовать использовать SomnifugiJMS , реализацию JMS in-vm, использующую java.util.concurrent в качестве фактического «движка» своего рода.

Это, вероятно, будет несколько излишним для ваших целей, но вполне может позволить вашему приложению распространяться практически без дополнительного программирования (если применимо), вы просто подключаете другую реализацию JMS, такую ​​как ActiveMQ и все готово.

2 голосов
/ 26 апреля 2009

Вам нужно немного взвесить, если у вас много конечных машин и случайных сообщений каждому или несколько конечных машин и частых сообщений каждому.

Если у вас много конечных машин, то буквально наличие одного потока на конечную машину звучит немного громко, если вы не собираетесь постоянно передавать сообщения на все эти машины. Я бы предложил иметь пул потоков, которые будут расти только между определенными границами. Для этого вы можете использовать ThreadPoolExecutor. Когда вам нужно опубликовать сообщение, вы фактически отправляете исполняемый файл исполнителю, который отправит сообщение:

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

Если у вас есть только несколько конечных компьютеров, то у вас может быть BlockingQueue на машину и поток на очередь блокировки, ожидающий следующего сообщения. В этом случае шаблон больше похож на этот (остерегайтесь непроверенного кода в воскресенье утром):

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

В этом случае мы закрываем соединение, если в течение 60 секунд не пришло сообщение для этого устройства.

Стоит ли вместо этого использовать JMS? Ну, вы должны взвесить, кажется ли вам это сложным. Мое личное ощущение, что это не достаточно сложная задача, чтобы оправдать специальные рамки. Но я уверен, что мнения расходятся.

P.S. В действительности, теперь я смотрю на это, вы, вероятно, поместите очередь в объект потока и просто отобразите ID машины -> объект потока. В любом случае, вы поняли идею.

2 голосов
/ 26 апреля 2009

Могу ли я предложить вам посмотреть BlockingQueue ? Ваш процесс отправки может записать в эту очередь (положить), и клиенты могут принимать или просматривать потокобезопасным способом. Так что вам вообще не нужно писать реализацию очереди.

Если у вас есть одна очередь, содержащая сообщения разных типов, вам потребуется реализовать механизм peek-type для каждого клиента (т. Е. Им придется проверять заголовок очереди и брать только то, что принадлежит им). Для эффективной работы потребители должны будут своевременно и надежно извлекать необходимые для них данные.

Если у вас есть одна очередь / поток для каждого типа сообщения / потребителя, то это будет проще / надежнее.

Ваша клиентская реализация просто должна будет зацикливаться на:

while (!done) {
   Object item = queue.take();
   // process item
}

Обратите внимание, что очередь может использовать универсальные шаблоны, а take() блокируется.

Конечно, когда несколько потребителей принимают сообщения разных типов, вы можете рассмотреть космическую архитектуру . Это не будет иметь характеристик очереди (FIFO), но позволит вам легко использовать несколько потребителей.

1 голос
/ 26 апреля 2009

Прежде всего, если вы планируете иметь много приемников, я бы не стал использовать подход «ОДНА РЕЗЬБА-И-ОЧЕРЕДЬ-НА-ПРИЕМНИК». Вы можете закончить с множеством потоков, которые ничего не делают большую часть времени, и я мог бы навредить вашей производительности. Альтернативой является использование пула рабочих потоков, просто выбор задач из общей очереди, каждая задача со своим собственным идентификатором получателя и, возможно, общий словарь с сокетными подключениями к каждому получателю для использования рабочими потоками.

Сказав это, если вы все еще хотите продолжить свой подход, то вы можете сделать следующее:

1) Создайте новый класс для обработки вашего нового потока:

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) В главном потоке создайте сообщения и используйте словарь (хэш-таблицу), чтобы проверить, созданы ли потоки получателя. Если есть, просто поставьте в очередь новое сообщение. Если нет, создайте новый поток, поместите его в хеш-таблицу и поставьте в очередь новое сообщение:

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

Удачи.

Редактировать: вы можете улучшить это решение, используя BlockingQueue Брайан, упомянутый с этим подходом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...