Шаблон для выполнения одновременных задач только один раз - PullRequest
0 голосов
/ 30 июня 2011

Я работаю на сервере Java, который отправляет сообщения xmpp, и рабочие выполняют задачи от моих клиентов.

private static ExecutorService threadpool = Executors.newCachedThreadPool();

DispatchWorker worker = new DispatchWorker(connection, packet);
threadpool.execute(worker);

Работает нормально, но мне нужно немного больше.

  1. Я не хочу выполнять один и тот же запрос несколько раз.
  2. Мой рабочий может запустить другой поток с заданием backround, которое также может запускаться только один раз. Пул потоков в рабочих потоках.

Я могу идентифицировать запросы по строке, а также могу дать заданию фоновых заданий идентификатор для их идентификации.

Моим решением будет синхронизированная хэш-карта, в которой мои текущие задачи будут зарегистрированы с их идентификатором. Ссылка на карту будет передана рабочим потокам, которым они удалят свои записи после завершения.

Это решение кажется немного неуклюжим, поэтому я хотел узнать, есть ли более элегантные шаблоны / лучшие практики.

С наилучшими пожеланиями, м

Ответы [ 5 ]

0 голосов
/ 01 июля 2011

Мы изначально написали наши собственные утилиты , чтобы справиться с этим, но если вы хотите, чтобы результаты были запомнены, то Guava * ComputingMap инкапсулирует инициализацию однимтолько один поток (с другими потоками, блокирующими и ожидающими результат), и памятка.

Он также поддерживает различные стратегии истечения срока действия.

Использование простое, вы создаете его с помощью функции инициализации:

Map<Long, Foo> cache = new MapMaker().makeComputingMap(new Function<Long, Foo>() {
  public Foo apply(String key) {
    return … // init with expensive calculation
  }
});

, а затем просто вызовите его:

Foo foo = cache.get("key");

Первым потоком, который запросит «ключ», будет тот, кто выполнит инициализацию

0 голосов
/ 30 июня 2011

Я хотел бы использовать очереди и рабочие потоки демона, которые всегда работают, и ждать, пока что-то поступит в очередь.Таким образом, гарантируется, что только один работник работает по запросу.Если вы хотите, чтобы запускался только один поток, уменьшите POOLSIZE до 1 или используйте newSingleThreadExecutor.

Я не совсем понимаю ваше второе требование: вы имеете в виду, что только 1 поток может быть запущен в качестве фоновой задачи?Если это так, вы можете создать еще один SingleThreadExecutor и использовать его для фоновой задачи.Тогда не имеет большого смысла иметь значение POOLSIZE> 1, если только работа, выполняемая в фоновом потоке, не очень коротка по сравнению с работой, выполняемой в самом работнике.

private static interface Request {};
private final int POOLSIZE = 10;
private final int QUEUESIZE = 1000;
BlockingQueue<Request> e = new LinkedBlockingQueue<Request>(QUEUESIZE);

public void startWorkers() {
    ExecutorService threadPool = Executors.newFixedThreadPool(POOLSIZE);
    for(int i=0; i<POOLSIZE; i++) {
        threadPool.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    final Request request = e.take();
                    doStuffWithRequest(request);
                } catch (InterruptedException e) {
                    // LOG
                    // Shutdown worker thread.
                }
            }
        });
    }
}
public void handleRequest(Request request) {
    if(!e.offer(request)) {
        //Cancel request, queue is full;
    }
}

Во время запуска начинающие рабочие запускаютсярабочие (сюрприз!).handleRequest обрабатывает запросы, поступающие от веб-службы, сервлета или чего-либо еще.

Конечно, вам нужно адаптировать «Request» и «doStuffWithRequest» к вашим потребностям, добавить дополнительную логику для выключения и т. д.

0 голосов
/ 30 июня 2011

Я полагаю, что использование карты - это нормально.Но вместо синхронизированного HashMap вы также можете использовать ConcurrenHashMap, который позволяет указывать уровни параллелизма, т.е. сколько потоков может работать с картой одновременно.А также он имеет атомарную putIfAbsent операцию.

0 голосов
/ 30 июня 2011

Это именно то, что делает Кварц (хотя он делает гораздо больше, как планирование заданий в будущем).

0 голосов
/ 30 июня 2011

Вы можете использовать пул потоков Singleton или передать пул потоков в качестве аргумента. (У меня был бы бассейн final)

Вы можете использовать HashSet для защиты добавления дублирующих задач.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...