Оборачивание асинхронных вычислений в синхронные (блокирующие) вычисления - PullRequest
47 голосов
/ 02 февраля 2010

похожих вопросов:

У меня есть объект с методом, который я хотел бы представить клиентам библиотеки (особенно клиентам сценариев) как что-то вроде:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

но примитивный «материал», который у меня есть, представляет собой набор управляемых событиями классов:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

, где ImplementingThing принимает входные данные, выполняет некоторые непонятные вещи, такие как постановка задач в очередь задач, а затем, когда возникает результат, sink.onBazResult() вызывается в потоке, который может быть или не быть тем же потоком, что и ImplementingThing.doSomethingAsync ( ) был вызван.

Можно ли как-нибудь использовать имеющиеся у меня функции, управляемые событиями, вместе с примитивами параллелизма для реализации MyNiceInterface, чтобы клиенты сценариев могли с радостью ждать в блокирующем потоке?

edit: можно ли использовать FutureTask для этого?

Ответы [ 7 ]

46 голосов
/ 02 февраля 2010

Использование вашей собственной реализации Future:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

Решение создает внутреннюю функцию CountDownLatch, которая очищается после получения обратного вызова. Если пользователь вызывает get, CountDownLatch используется для блокировки вызывающего потока до завершения вычислений и вызова обратного вызова onBazResult. CountDownLatch гарантирует, что если обратный вызов произойдет до вызова get (), метод get () немедленно вернется с результатом.

16 голосов
/ 02 февраля 2010

Ну, есть простое решение сделать что-то вроде:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

Конечно, это предполагает, что ваш Baz результат никогда не будет нулевым ...

12 голосов
/ 21 сентября 2012

Библиотека Google guava имеет простой в использовании SettableFuture, который делает эту проблему очень простой (около 10 строк кода).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}
4 голосов
/ 16 февраля 2017

Это очень просто с RxJava 2.x:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Или без лямбда-нотации:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

Еще проще:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();
4 голосов
/ 27 сентября 2014

Очень простой пример, просто чтобы понять CountDownLatch без каких-либо дополнительный код.

A java.util.concurrent.CountDownLatch - это конструкция параллелизма, которая позволяет одному или нескольким потокам ожидать завершения определенного набора операций.

A CountDownLatch инициализируется с заданным количеством. Это число уменьшается при вызове метода countDown(). Потоки, ожидающие, пока это число достигнет нуля, могут вызывать один из методов await(). Вызов await() блокирует поток, пока счетчик не достигнет нуля.

Ниже приведен простой пример. После того, как Декремент звонит countDown() 3 раза на CountDownLatch, ожидающий Официант освобождается от вызова await().

Вы также можете упомянуть некоторые TimeOut, чтобы ждать.

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

// --------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Ссылка

Если вы не хотите использовать CountDownLatch или вам требуется что-то такое же, как в Facebook, в отличие от функциональности. Означает, что при вызове одного метода другой метод не вызывается.

В этом случае вы можете объявить

private volatile Boolean isInprocessOfLikeOrUnLike = false;

и затем вы можете проверить в начале вашего вызова метода, что если это false, то вызов метода в противном случае возвращает .. зависит от вашей реализации.

3 голосов
/ 14 марта 2016

Вот более общее решение, основанное на ответе Пола Уогланда:

public abstract class AsyncRunnable<T> {
    protected abstract void run(AtomicReference<T> notifier);

    protected final void finish(AtomicReference<T> notifier, T result) {
        synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
        }
    }

    public static <T> T wait(AsyncRunnable<T> runnable) {
        final AtomicReference<T> notifier = new AtomicReference<>();

        // run the asynchronous code
        runnable.run(notifier);

        // wait for the asynchronous code to finish
        synchronized (notifier) {
            while (notifier.get() == null) {
                try {
                    notifier.wait();
                } catch (InterruptedException ignore) {}
            }
        }

        // return the result of the asynchronous code
        return notifier.get();
    }
}

Вот пример того, как его использовать ::

    String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
        @Override
        public void run(final AtomicReference<String> notifier) {
            // here goes your async code, e.g.:
            new Thread(new Runnable() {
                @Override
                public void run() {
                    finish(notifier, "This was a asynchronous call!");
                }
            }).start();
        }
    });

Более подробную версию кода можно найти здесь: http://pastebin.com/hKHJUBqE

EDIT: Пример, связанный с этим вопросом:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
        @Override
        protected void run(final AtomicReference<Baz> notifier) {
            doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                public void onBazResult(Baz result) {
                    synchronized (notifier) {
                        notifier.set(result);
                        notifier.notify();
                    }
                }
            });
        }
    });
}
1 голос
/ 06 мая 2019

Самый простой способ (который работает для меня) -

  1. Создать очередь блокировки
  2. Вызовите асинхронный метод - используйте обработчик, который предлагает результат в эту очередь блокировки.
  3. Опрос очереди (вот где вы блокируете) для результата.

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
        final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
        doSomethingAsync(fooArg, barArg, blocker::offer);
        // Now block until response or timeout
        return blocker.poll(30, TimeUnit.SECONDS);
    }
    
...