Исполнители Java: как получить уведомление, не блокируя, когда задача завершена? - PullRequest
128 голосов
/ 05 мая 2009

Скажем, у меня есть очередь, полная задач, которые мне нужно отправить в службу исполнителя. Я хочу, чтобы они обрабатывались по одному. Самый простой способ, который я могу придумать, это:

  1. Возьмите задание из очереди
  2. Отправить его исполнителю
  3. Вызовите .get для возвращенного Future и заблокируйте, пока не будет доступен результат
  4. Возьми еще одно задание из очереди ...

Однако я стараюсь полностью не блокировать. Если у меня будет 10 000 таких очередей, для которых нужно обрабатывать свои задачи по одной, я исчерпаю пространство стека, потому что большинство из них будут удерживать заблокированные потоки.

То, что я хотел бы, это отправить задачу и предоставить обратный вызов, который вызывается, когда задача завершена. Я буду использовать это уведомление об обратном вызове в качестве флага для отправки следующего задания. (функционал java и jetlang, очевидно, используют такие неблокирующие алгоритмы, но я не могу понять их код)

Как я могу это сделать, используя java.util.concurrent JDK, если не считать написания моей собственной службы исполнителя?

(очередь, которая питает меня этими задачами, может сама блокироваться, но это проблема, которую необходимо решить позже)

Ответы [ 11 ]

128 голосов
/ 05 мая 2009

Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомлении о завершении. Затем вызовите его в конце задачи.

Вы даже можете написать общую оболочку для задач Runnable и отправить их в ExecutorService. Или см. Ниже механизм, встроенный в Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

С CompletableFuture Java 8 включает более сложные средства для составления конвейеров, где процессы могут выполняться асинхронно и условно. Вот надуманный, но полный пример уведомления.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
47 голосов
/ 13 марта 2014

В Java 8 вы можете использовать CompletableFuture . Вот пример, который я использовал в своем коде, где я использую его для извлечения пользователей из моей пользовательской службы, сопоставления их с моими объектами представления, а затем обновления моего представления или отображения диалога об ошибках (это приложение с графическим интерфейсом):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Выполняется асинхронно. Я использую два частных метода: mapUsersToUserViews и updateView.

46 голосов
/ 14 ноября 2012

Используйте API будущего прослушивания Guava и добавьте обратный вызов. Ср с сайта:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
24 голосов
/ 17 февраля 2012

Вы можете расширить класс FutureTask и переопределить метод done(), а затем добавить объект FutureTask в ExecutorService, чтобы метод done() вызывался, когда FutureTask сразу завершался.

14 голосов
/ 05 мая 2009

ThreadPoolExecutor также имеет beforeExecute и afterExecute методы подключения, которые вы можете переопределить и использовать. Вот описание от ThreadPoolExecutor * Javadocs .

Методы крюка

Этот класс предоставляет защищенные переопределяемые beforeExecute(java.lang.Thread, java.lang.Runnable) и afterExecute(java.lang.Runnable, java.lang.Throwable) методы, которые вызываются до и после выполнения каждой задачи. Их можно использовать для манипулирования средой исполнения; например, повторная инициализация ThreadLocals, сбор статистики или добавление записей журнала. Кроме того, метод terminated() может быть переопределен для выполнения любой специальной обработки, которая должна быть выполнена после полного завершения Executor. Если методы ловушки или обратного вызова генерируют исключения, внутренние рабочие потоки могут, в свою очередь, завершиться сбоем и внезапно завершиться.

6 голосов
/ 05 мая 2009

Используйте CountDownLatch.

Это из java.util.concurrent, и это именно тот способ, которым нужно дождаться завершения нескольких потоков, прежде чем продолжить.

Чтобы добиться нужного вам эффекта обратного вызова, это требует немного дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном потоке, который использует CountDownLatch и ждет его, а затем продолжает уведомлять обо всем, что вам нужно уведомить. Не существует встроенной поддержки обратного вызова или чего-либо подобного этому эффекту.


РЕДАКТИРОВАТЬ: Теперь, когда я еще больше понимаю ваш вопрос, я думаю, что вы слишком далеко зашли, без необходимости. Если вы берете обычный SingleThreadExecutor, задайте ему все задачи, и он будет выполнять очередь изначально.

4 голосов
/ 05 мая 2009

Если вы хотите убедиться, что никакие задачи не будут выполняться одновременно, используйте SingleThreadedExecutor . Задачи будут обработаны в порядке их отправки. Вам даже не нужно задерживать задачи, просто отправьте их исполнительному директору.

1 голос
/ 06 апреля 2016

Простой код для реализации механизма Callback с использованием ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

выход:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Ключевые примечания:

  1. Если вы хотите последовательно обрабатывать задачи в порядке FIFO, замените newFixedThreadPool(5) на newFixedThreadPool(1)
  2. Если вы хотите обработать следующую задачу после анализа результата из callback предыдущей задачи, просто снимите комментарий ниже строки

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Вы можете заменить newFixedThreadPool() одним из

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    в зависимости от вашего варианта использования.

  4. Если вы хотите обрабатывать метод обратного вызова асинхронно

    а. Передайте общую ExecutorService or ThreadPoolExecutor в вызываемую задачу

    б. Преобразуйте ваш Callable метод в Callable/Runnable задачу

    с. Нажмите задачу обратного вызова на ExecutorService or ThreadPoolExecutor

1 голос
/ 19 марта 2016

Это расширение ответа Пача с использованием ListenableFuture.

Гуавы.

В частности, Futures.transform() возвращает ListenableFuture, поэтому может использоваться для цепочки асинхронных вызовов. Futures.addCallback() возвращает void, поэтому не может использоваться для цепочки, но подходит для обработки успеха / неудачи при асинхронном завершении.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

ПРИМЕЧАНИЕ: Помимо цепочки асинхронных задач, Futures.transform() также позволяет планировать каждую задачу для отдельного исполнителя (не показан в этом примере).

1 голос
/ 29 августа 2015

Просто добавьте к ответу Мэтта, который помог, вот более конкретный пример, демонстрирующий использование обратного вызова.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Вывод:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
...