Связь между производителями и потребителями - PullRequest
6 голосов
/ 21 декабря 2011

возникли проблемы с межпотоковой связью и "решил" ее, используя "фиктивные сообщения" повсюду. Это плохая идея? Каковы возможные решения?

Пример проблемы у меня есть.

основной поток запускает поток для обработки и вставки записей в базу данных. Основной поток читает возможно большой файл и помещает одну запись (объект) за другой в очередь блокировки. поток обработки читает из очереди и работает.

Как мне сказать "обработка потока", чтобы остановить? Очередь может быть пустой, но работа не завершена, и основной поток теперь не работает, когда поток обработки завершил работу и не может ее прервать.

Так обрабатывает поток

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) {
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS);
    if (object != null) {
        String data = object.getData();
        if (data.equals("END")) {
            break;
        }
    // do work
    }
}
// clean-up
synchronized queue) {
    queue.notifyAll();
}
return;

и основная нить

// ...start processing thread...
while(reader.hasNext(){
    // ...read whole file and put data in queue...
}
MyObject dummy = new MyObject();
dummy.setData("END");
queue.put(dummy);
//Note: empty queue here means work is done
while (queue.size() > 0) {
    synchronized (queue) {
        queue.wait(500); // over-cautios locking prevention i guess
    }
}

Обратите внимание, что вставка должна быть в одной транзакции, и транзакция не может быть обработана по основной теме.

Что было бы лучшим способом сделать это? (Я учусь и не хочу начинать "делать это неправильно")

Ответы [ 3 ]

4 голосов
/ 21 декабря 2011

Эти фиктивные сообщения действительны. Это называется "яд". Нечто, что производитель посылает потребителю, чтобы он остановился.

Другая возможность - вызвать Thread.interrupt () где-нибудь в главном потоке, перехватить и обработать InterruptedException соответственно в рабочем потоке.

2 голосов
/ 22 декабря 2011

«решил» это с помощью «фиктивных сообщений» повсюду.Это плохая идея?Каковы возможные решения?

Это неплохая идея, она называется «Ядовитые пилюли» и является разумным способом остановить службу на основе потоков.

Но она работает только тогда, когдачисло производителей и потребителей известно.

В размещенном вами коде есть два потока: один - «основной поток», который производит данные, другой - «поток обработки», который потребляет данные, ««Отравляющие таблетки» хорошо подходит для этого обстоятельства.

Но чтобы представить, если у вас есть и другие производители, как потребитель узнает, когда следует остановиться (только когда все производители отправляют «Отравляющие таблетки»), вам необходимо точно знать,количество всех производителей, а также проверить количество «Ядовитых таблеток» у потребителя, если оно равняется количеству производителей, что означает, что все производители перестали работать, то потребитель останавливается.

In »основной поток", вам нужно поймать InterruptedException, так как в противном случае" основной поток "может не установить" Ядовитую пилюлю ".Вы можете сделать это, как показано ниже,

...
try {
    // do normal processing
} catch (InterruptedException e) { /*  fall through  */  }
finally {
    MyObject dummy = new MyObject();
    dummy.setData("END");
    ...
}
...

Кроме того, вы можете попробовать использовать ExecutorService для решения всех ваших проблем.

(Это работает, когда вам просто нужно выполнить некоторые работы, а затем остановиться, когда все закончится)

void doWorks(Set<String> works, long timeout, TimeUnit unit)
    throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    try {
        for (final String work : works)
            exec.execute(new Runnable() {
                    public void run() {
                        ...
                    }
                });
    } finally {
        exec.shutdown();
        exec.awaitTermination(timeout, unit);
    }
}

Я учусь и не хочу начинать "делать"это неправильный путь "

Возможно, вам придется прочитать Книгу: Параллелизм Java на практике.Поверь мне, это лучшее.

0 голосов
/ 22 декабря 2011

Что вы могли бы сделать (что я сделал в недавнем проекте), это обернуть очередь и затем добавить метод 'isOpen()'.

class ClosableQ<T> {

   boolean isOpen = true;

   private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>();

   public void put(T someObject) {
      if (isOpen) {
         lbq.put(someObject);
      }
   }

   public T get() {
      if (isOpen) {
         return lbq.get(0);
      }
   }

   public boolean isOpen() {
      return isOpen;
   }

   public void open() {
      isOpen = true;
   }

   public void close() {
      isOpen = false;
   }
}

Таким образом, ваш поток записи становится примерно таким:

while (reader.hasNext() ) {
  // read the file and put it into the queue
  dataQ.put(someObject);
} 
// now we're done
dataQ.close();

и ветка читателя:

while (dataQ.isOpen) {
   someObject = dataQ.get();
}

Конечно, вы можете расширить список, но это дает пользователю уровень доступа, который вам может не понадобиться.И вам нужно добавить некоторые вещи для параллелизма в этот код, например AtomicBoolean.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...