Как создавать и обрабатывать сообщения из рабочих потоков - PullRequest
2 голосов
/ 30 октября 2011

Как создавать асинхронные сообщения из рабочих потоков в управляющий поток.Рамки ниже.Во фрагменте Worker реализует Runnable, а threadManager - ExecutorService.Рабочие потоки работают долго и должны периодически передавать сообщения о ходе выполнения менеджеру.Одним из вариантов является использование очереди блокировки, но я раньше этого не делал.

    RunnableTester_rev2.threadManager = Executors.newFixedThreadPool( nThreadsAllowed );

    final List<Worker> my_tasks = new ArrayList<Worker>();
    for ( int i = 0; i < nThreadsToRun; i++ )
    {
        // The Worker thread is created here.
        // Could pass a blocking queue to handle messages.
        my_tasks.add( new Worker( ) );
    }

    final List<Future<?>> fList = new ArrayList<Future<?>>();

    for ( final Worker task : my_tasks )
    {
        fList.add( RunnableTester_rev2.threadManager.submit( task ) );
    }

    RunnableTester_rev2.threadManager.shutdown();


    /**
     * Are we all done
     */
    boolean isDone = false;

    /**
     * Number of threads finished
     */
    int nThreadsFinished = 0;

    /**
     * Keep processing until all done
     */
    while ( !isDone )
    {
        /*
         * Are all threads completed?
         */
        isDone = true;
        int ii = 0;
        nThreadsFinished = 0;
        for ( final Future<?> k : fList )
        {
            if ( !isThreadDoneList.get( ii ) )
            {
                final boolean isThreadDone = k.isDone();
                if ( !isThreadDone )
                {
                    // Reduce printout by removing the following line
                    // System.out.printf( "%s is not done\n", mywks.get( ii ).getName() );
                    isDone = false;
                }
            }
            else
            {
                nThreadsFinished++;
            }
            ii++;
        }

        /*
        *  WOULD LIKE TO PROCESS Worker THREAD MESSAGES HERE
        */

    }

Ответы [ 2 ]

1 голос
/ 30 октября 2011

Это действительно сложно. Я бы сделал что-то вроде этого.

// no need to use a field.
ExecutorService threadManager = Executors.newFixedThreadPool( nThreadsAllowed );
List<Future<Result>> futures = new ArrayList<Future<Result>>();
for ( int i = 0; i < nThreadsToRun; i++ )
    // Worker implements Callable<Result>
    futures.add(threadManager.submit(new Worker( )));
threadManager.shutdown();
threadManager.awaitTermination(1, TimeUnit.MINUTE);
for(Future<Result> future: futures) {
    Result result = future.get();
    // do something with the result
}
0 голосов
/ 31 октября 2011

Я бы использовал BlockingQueue. Вы можете попробовать что-то вроде этого:

final BlockingQueue<Message> q = new LinkedBlockingQueue<Message>();
// start all the worker threads,
// making them report progress by adding Messages into the queue
for (...) {
  threadManager.submit(new Runnable() {
    // initialize the worker
    q.offer(new Message("10% completed"));
    // do half of the work
    q.offer(new Message("50% completed"));
    // do the rest of the work
    q.offer(new Message("100% completed"));
  });
}
// after starting all the workers, call a blocking retrieval method
// on the queue, waiting for the messages
while (!(everything is done)) {
  Message m = q.take();
  // process the message
}

Идея состоит в том, что все рабочие потоки и основной поток имеют общую очередь блокировки. Рабочие потоки являются производителями, а основной поток - потребителями.

Здесь я использую .offer(...), но вы можете использовать .add(...), и вы получите то же самое в этом случае. Разница будет в том, что первый вернёт false, если очередь заполнена, а последний выдаст исключение. Это не может произойти здесь, так как я создал экземпляр неограниченной очереди (очевидно, вы можете получить OutOfMemoryError). Однако, если вы используете .put(...), вам придется иметь дело с неконтролируемым InterruptedException (который в этом случае бесполезен: его нельзя выбросить, потому что очередь никогда не заполнена - она ​​не ограничена), поэтому метод никогда не будет блокироваться).

Очевидно, что вы можете улучшить его, реорганизовав вызовы q.offer(...) в метод reportStatus(...) или что-то в этом роде.


Может быть, вы хотите что-то другое, чем я предложил выше. Возможно, вы хотите, чтобы основной поток получал асинхронные сообщения только тогда, когда каждый рабочий поток завершает свою работу.

Мое решение похоже на то, что было предложено Питером Лоури, с той разницей, что с этим решением управляющий поток получит уведомление, как только закончится какой-либо из рабочих потоков (в то время как с решением Питера это результат рабочих потоков будет получен по порядку - поэтому, если первый рабочий поток будет работать вечно, даже если все другие рабочие потоки завершат свою работу, вы об этом не узнаете). В зависимости от того, что вы делаете, вы можете повысить производительность.

Обратите внимание, что с помощью этого решения вы не сможете определить, какой поток завершил свою работу (для этого у вашего Worker класса должен быть метод типа .getJobId() или что-то подобное), в то время как с решением Питера вы будете точно знать какой поток завершил свою работу (поскольку вы выполняете итерацию в списке futures в том же порядке, в котором вы создали экземпляр и запустили Workers).

Если это так, вы можете использовать ExecutorCompletionService. Он инкапсулирует ExecutorService и предоставляет метод .take() (блокирующий поиск), который вернет Future s из службы исполнителя сразу после их завершения:

ExecutorCompletionService<Worker> ecs = new ExecutorCompletionService(executorService);
// Worker has to be a callable
// Now you submit your tasks to the ecs:
for (...) {
  ecs.submit(new Worker(...));
}
// Now you block on .take():
while (!(everything is done)) {
  Future<Worker> f = ecs.take();
  // f.get(), for example, to check the results
}

CompletionService гарантирует, что, как только вы возьмете Future, вызов его метода .get() будет немедленно завершен: либо путем возврата результата, либо с помощью исключения ExecutionException, инкапсулирующего исключение, выданное Callable.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...