Ожидание между задачами с SingleThreadExecutor - PullRequest
2 голосов
/ 07 июня 2010

Я пытаюсь (просто) создать блокирующую очередь потоков, где при отправке задачи метод ожидает завершения ее выполнения. Но самая сложная часть - это ожидание.

Вот мой код в 12:30, который я считаю излишним:

public void sendMsg(final BotMessage msg) {
    try {
        Future task;
        synchronized(msgQueue) {
            task = msgQueue.submit(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            });
            //Add a seperate wait so next runnable doesn't get executed yet but
            //above one unblocks
            msgQueue.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(Controller.msgWait);
                    } catch (InterruptedException e) {
                        log.error("Wait to send message interupted", e);
                    }
                }
            });
        }
        //Block until done
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}

Как вы можете видеть, там много лишнего кода, чтобы просто подождать 1,7 секунды между задачами. Есть ли более простое и чистое решение или это оно?

Ответы [ 7 ]

5 голосов
/ 08 июня 2010

Хорошо, вот мысль. Вы можете использовать ScheduledExceutorService, который запомнит последний раз, когда вы выполнили исполняемый файл, и соответственно задержит следующее выполнение, вплоть до максимального времени ожидания (здесь жестко задано 1700).

    //@GuardedBy("msgQueue")
Date mostRecentUpdate = new Date();

public void sendMsg(final BotMessage msg) {
    try {
        Future task;
        synchronized (msgQueue) {               
            long delta = new Date().getTime() - mostRecentUpdate.getTime();
            task = msgQueue.schedule(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            }, delta <= 1700 ?1700 : 0, TimeUnit.MILLISECONDS);

            mostRecentUpdate = new Date();
        }
        // Block until done
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}
3 голосов
/ 07 июня 2010

Если ваша декларация выглядит следующим образом:

ExecutorService msgQueue = Executors.newSingleThreadExecutor();

Вы можете просто использовать этот код для достижения того, что вы ищете:

msgQueue.submit(new Runnable() {
        public void run() {
            sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
            try {
                Thread.sleep(Controller.msgWait);
            } catch (InterruptedException e) {
                log.error("Wait to send message interupted", e);
            }
        }
    })

как однопоточный исполнитель может одновременно выполнять только одну задачу.

На что обратить внимание:

  1. Нет необходимости синхронизировать отправку в ExecutorService
  2. Нет необходимости в будущем, если вам не нужен результат выполнения
  3. Ваша обработка InterruptedException немного сложна.
1 голос
/ 08 июня 2010

Если вы хотите допустить наследование от ThreadPoolExecutor, вы можете определить нового Executor, который задерживается после завершения каждого выполнения, следующим образом (с одним потоком в пуле это ограничивает максимальную скорость выполнения задач каждые 1700 мс) :

final ThreadPoolExecutor msgQueue = new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                Thread.sleep(Controller.msgWait));
            } catch (InterruptedException e) {
                log.error("Wait to send message interrupted", e);
            }
        }
    };

, а затем используйте его как обычно:

Future task = msgQueue.submit(new Runnable() {
        public void run() {
            sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
        }
    });
    try {
        task.get();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }

Не обязательно, чтобы он был чище, чем использование ScheduledExecutorService, хотя он избегает синхронизированного блока, объявляя о будущем вне блока и вводя поле Date.

1 голос
/ 07 июня 2010

Я не совсем уверен, понимаю ли я вашу цель: зачем вам очередь из потоков? Почему вы не можете просто поставить сообщения в очередь?

Вы можете использовать производителя / потребителя ... иметь одного потребителя, который читает очередь, и несколько производителей заполняют ее.

Обновление

ОК, здесь обновленная версия, которая блокируется до тех пор, пока сообщение не будет поставлено в очередь, и отправляет не более одного сообщения в каждые 1700 миллисекунд.

int delay = 1700; // milliseconds
StopWatch stopwatch = new Stopwatch();
BlockingQueue<BotMessage> msgQueue = new BlockingQueue<BotMessage>();

public void main()
{
    // A consumer thread
    new Thread(new Runnable() {
        public void run() {
            while(true)
            {
                // Blocks until there is something in the queue
                BotMessage msg = msgQueue.take();
                stopwatch.stop();

                // Sleeps until the minimum delay time has passed (if it hasn't passed)
                if(stopwatch.getElapsedTime() < delay)
                {
                    Thread.sleep(delay-stopwatch.getElapsedTime());
                }
                stopwatch.start();

                sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
            }
        }
    })).start();
}

// Producers call sendMsg
public void sendMsg(final BotMessage msg) {
    msgQueue.put(msg);
}

Вы можете найти реализацию секундомера здесь .

0 голосов
/ 25 июня 2010

Как предложено здесь , я публикую это как ответ, поскольку это самое чистое решение, которое я придумала, но не фактическую копию какого-либо ответа, опубликованного здесь.

Спасибо за помощь

 public final CustBlockingQueue<BotMessage> msgQueue = new CustBlockingQueue<BotMessage>(1);


     // A consumer thread
     new Thread(new Runnable() {
         public void run() {
             while (true)
                 try {
                     // Blocks until there is something in the queue
                     BotMessage msg = msgQueue.take();
                     sendRawLine("PRIVMSG " + msg.getChannel() + " :" + msg.getMessage());
                     //Release lock so that put() unblocks
                     msgQueue.lock.lockInterruptibly();
                     msgQueue.doneProcessing.signal();
                     msgQueue.lock.unlock();
                     //Wait before continuing
                     Thread.sleep(Controller.msgWait);
                 } catch (InterruptedException e) {
                     log.error("Wait for sending message interrupted", e);
                 }
         }
     }).start();

 public class CustBlockingQueue<E> extends ArrayBlockingQueue<E> {
     public ReentrantLock lock = new ReentrantLock(false);
     public Condition doneProcessing = lock.newCondition();

     public CustBlockingQueue(int capacity) {
         super(capacity);
     }

     public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        super.put(e);
        doneProcessing.await();
        lock.unlock();
    }
}
0 голосов
/ 07 июня 2010

Вместо будущего используйте CountDownLatch .

В основном:

public void sendMsg(final BotMessage msg) {
    try {
        final CountDownLatch latch = new CountDownLatch(1);
        msgQueue.submit(new Runnable() {
            public void run() {
                sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                latch.countDown();
                try {
                    Thread.sleep(Controller.msgWait);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); //This is usually a best practice
                    log.error("Wait to send message interupted", e);
                }
            }
        });
        //Block until done
        latch.await();
    } catch (ExecutionException e) {
        log.error("Couldn't schedule send message to be executed", e);
    } catch (InterruptedException e) {
        log.error("Wait to send message interupted", e);
    }
}
0 голосов
/ 07 июня 2010

Хотя я не уверен, почему вам нужно ждать, так как я думаю, что реализация очереди выберет задачи в порядке их отправки, то есть это будет FIFO и, следовательно, задачи должны быть выполнены (когда вы вызываете msgQueue.get () в порядке их представления. Пожалуйста, поправьте меня, если это не так.

Однако, если нам все еще нужно приостановить публикацию, мы можем сделать это, как указано ниже.

Не можете ли вы напрямую перевести тему в спящий режим после отправки сообщения в очередь вместо отправки задачи ожидания в очередь?

Так будет

synchronized(msgQueue) {
            task = msgQueue.submit(new Runnable() {
                public void run() {
                    sendRawLine("PRIVMSG " + msg.channel + " :" + msg.message);
                }
            });
            Thread.sleep(Controller.msgWait); // no other thread can submit a new task until this sleep is over.
        }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...