Синхронизировать параллельные запросы, чтобы поделиться результатами медленной операции - PullRequest
0 голосов
/ 14 ноября 2018

У меня есть служба пользовательского интерфейса Java, в которой есть метод API, который вызывает относительно медленную операцию (скажем, ~ 30 секунд). Операция не имеет параметров, но она работает с внешними данными, которые изменяются (относительно медленно) с течением времени. Для метода не критично возвращать самые последние результаты - если им 30 секунд, это приемлемо.

В конечном итоге мне нужно оптимизировать реализацию медленной операции, но в качестве кратковременного исправления я хотел бы сделать операцию взаимоисключающей, например, если второй входящий запрос (в отдельном потоке) попытается вызвать операция, в то время как другая уже выполняется, затем вторая блокируется, пока первая не завершится. Затем второй поток использует результаты первого вызова операции - то есть он не пытается снова запустить операцию.

например:.

class MyService {
    String serviceApiMmethod() {
       // If a second thread attempts to call this method while another is in progress
       // then block here until the first returns and then use those results
       // (allowing it to return immediately without a second call to callSlowOperation).
       return callSlowOperation();
    }
}

Какой предпочтительный общий подход для этого в Java (8). Я предполагаю, что мог бы использовать CountDownLatch, но не ясно, как лучше всего разделить результат между потоками. Существует ли существующий примитив параллелизма, способствующий этому?

РЕДАКТИРОВАТЬ: Мне нужно очистить любые ссылки на результат, как только все потоки использовали его (т.е. вернули его вызывающей стороне), так как это относительно большой объект, который должен быть GC 'как можно скорее насколько это возможно.

Ответы [ 4 ]

0 голосов
/ 15 ноября 2018

Простая идея

Версия 1:

class Foo {
    public String foo() throws Exception {
        synchronized (this) {
            if (counter.incrementAndGet() == 1) {
                future = CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1000 * (ThreadLocalRandom.current().nextInt(3) + 1));
                    } catch (InterruptedException e) {
                    }
                    return "ok" + ThreadLocalRandom.current().nextInt();
                });
            }
        }

        String result = future.get();
        if (counter.decrementAndGet() == 0) {
            future = null;
        }

        return result;
    }

    private AtomicInteger counter = new AtomicInteger();
    private Future<String> future;
}

Версия 2: вместе с @ Александр Семянников

public class MyService {
    private AtomicInteger counter = new AtomicInteger();
    private volatile String result;

    public String serviceApiMethod() {
        counter.incrementAndGet();
        try {
            synchronized (this) {
                if (result == null) {
                    result = callSlowOperation();
                }
            }
            return result;
        } finally {
            if (counter.decrementAndGet() == 0) {
                synchronized (this) {
                    if (counter.get() == 0) {
                        result = null;
                    }
                }
            }
        }
    }

    private String callSlowOperation() {
        try {
            Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Thread.currentThread().getName();
    }
}
0 голосов
/ 14 ноября 2018

Другой подход (который, я думаю, может быть даже лучше) состоит в добавлении всех потоков, которые запрашивают результат, в синхронизированную коллекцию. Затем, когда результат прибудет - отправьте ответы обратно в потоки. Вы можете использовать потребительский интерфейс java 8, чтобы сделать его более привлекательным. Это не будет тратить процессорное время (как с thread.sleep или даже с countDownLatch и другими современными классами параллельных Java-вычислений). Требуется, чтобы эти потоки имели метод обратного вызова для принятия результата, но это могло бы даже облегчить чтение вашего кода:

class MyService {
    private  static volatile boolean isProcessing;
    private synchronized static  boolean  isProcessing() {
        return isProcessing;
    }
    private static Set<Consumer<String>> callers=Collections.synchronizedSet(new HashSet<>());

    void serviceApiMmethod(Consumer<String> callBack) {
       callers.add(callBack);
       callSlowOperation();
    }

    private synchronized static  void callSlowOperation() {
        if(isProcessing())
            return;
        isProcessing=true;
        try { Thread.sleep(1000); }catch (Exception e) {}//Simulate slow operation
        final String result="slow result";
        callers.forEach(consumer-> consumer.accept(result));
        callers.clear();
        isProcessing=false;

    }
}

И вызывающие темы:

class ConsumerThread implements Runnable{
    final int threadNumber;
    public ConsumerThread(int num) {
        this.threadNumber=num;

    }
    public void processResponse(String response) {
        System.out.println("Thread ["+threadNumber+"] got response:"+response+" at:"+System.currentTimeMillis());
    }

    @Override
    public void run() {
            new MyService().serviceApiMmethod(this::processResponse);
    }


}

Таким образом, получающийся объект будет собирать мусор, потому что все потребители сразу его получат и выпустят.

И для проверки результатов:

public class Test{
    public static void main(String[] args) {
        System.out.println(System.currentTimeMillis());
        for(int i=0;i<5;i++) {
            new Thread(new ConsumerThread(i)).start();
        }
    }
}

И результат:

1542201686784
Thread [1] got response:slow result at:1542201687827
Thread [2] got response:slow result at:1542201687827
Thread [3] got response:slow result at:1542201687827
Thread [0] got response:slow result at:1542201687827
Thread [4] got response:slow result at:1542201687827

Все темы получили результат через 1 секунду. Вид реактивного программирования;) Это действительно меняет способ на более асинхронный, но если вызывающий поток хочет заблокировать выполнение при получении результата, он может его достичь. Сервис в основном говорит о том, что сделано. Это все равно что сказать: «моя операция медленная, поэтому вместо того, чтобы выполнять вызов для каждого из вас, я отправлю вам результат, как только буду готов - дайте мне потребительский метод»

0 голосов
/ 14 ноября 2018

ReentrantReadWriteLock будет проще в использовании:

class MyService {

  String result;
  ReadWriteLock lock = new ReentrantReadWriteLock(); 

  String serviceApiMmethod() {
    lock.readLock().lock();
    try {
      if (result == null || staleResult()) {
        lock.readLock().unlock();
        lock.writeLock().lock();
        try {
          if (result == null || staleResult()) {
            result = callSlowOperation();
          }
        } finally {
          lock.writeLock().unlock();
          lock.readLock().lock();
        }
      }
      return result;
    } finally {
       lock.readLock().unlock();
    }
  }
}

Здесь блокировка чтения защищает от устаревшего состояния чтения, а блокировка записи защищает от выполнения «медленной операции» несколькими способами одновременно.

0 голосов
/ 14 ноября 2018

В качестве решения вы можете использовать что-то вроде этого:

public class MyService {

    private volatile ResultHolder holder;

    public String serviceApiMethod() {
        if (holder != null && !isTimedOut(holder.calculated)) {
            return holder.result;
        }
        synchronized (this) {
            if (holder != null && !isTimedOut(holder.calculated)) {
                return holder.result;
            }
            String result = callSlowOperation();
            holder = new ResultHolder(result, LocalDateTime.now());
            return result;
        }
    }

    private static class ResultHolder {
        private String result;
        private LocalDateTime calculated;

        public ResultHolder(String result, LocalDateTime calculated) {
            this.result = result;
            this.calculated = calculated;
        }
    }
}

Обратите внимание, что MyService должен быть одноэлементным, а ResultHolder должен быть неизменным

...