Получить результаты запланированных неблокирующих операций в Java - PullRequest
1 голос
/ 17 июня 2019

Я пытаюсь выполнить некоторые операции блокировки (скажем, HTTP-запрос) по расписанию и без блокировки.Допустим, у меня есть 10 запросов, и один запрос занимает 3 секунды, но я хотел бы не ждать 3 секунды, а подождать 1 секунду и отправить следующий.После завершения всех выполнений я хотел бы собрать все результаты в списке и вернуть их пользователю.

Ниже приведен прототип моего сценария (спящий поток используется в качестве операции блокировки вместо HTTP-запроса)

    public static List<Integer> getResults(List<Integer> inputs) throws InterruptedException, ExecutionException {
        List<Integer> results = new LinkedList<Integer>();
        Queue<Callable<Integer>> tasks = new LinkedList<Callable<Integer>>();
        List<Future<Integer>> futures = new LinkedList<Future<Integer>>();
        for (Integer input : inputs) {
            Callable<Integer> task = new Callable<Integer>() {
                public Integer call() throws InterruptedException {
                    Thread.sleep(3000);
                    return input + 1000;
                }
            };
            tasks.add(task);
        }

        ExecutorService es = Executors.newCachedThreadPool();
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Callable<Integer> task = tasks.poll();
                if (task == null) {
                    ses.shutdown();
                    es.shutdown();
                    return;
                }
                futures.add(es.submit(task));
            }
        }, 0, 1000, TimeUnit.MILLISECONDS);

        while(true) {
            if(futures.size() == inputs.size()) {
                for (Future<Integer> future : futures) {
                    Integer result = future.get();
                    results.add(result);
                }
                return results;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<Integer> results = getResults(new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
        System.out.println(Arrays.toString(results.toArray()));
    }

Я жду в цикле while, пока все задачи не вернут правильный результат.Но оно никогда не входит в разрушающее состояние и бесконечно зацикливается.Всякий раз, когда я помещаю операцию ввода-вывода, такую ​​как logger или даже точку останова, она просто прерывает цикл while, и все становится нормально.

Я относительно новичок в параллелизме Java и пытаюсь понять, что происходит, и является ли этоправильный способ сделать.Я предполагаю, что операция ввода-вывода вызывает что-то в планировщике потоков и заставляет его проверять размеры коллекций.

1 Ответ

1 голос
/ 17 июня 2019

Вам нужно синхронизировать ваши темы. У вас есть два разных потока (основной поток и служебный поток exectuor), обращающихся к списку futures, и, поскольку LinkedList не синхронизирован, эти два потока видят два разных значения futures.

while(true) {
  synchronized(futures) {
    if(futures.size() == inputs.size()) {
      ...
    }
  }
}

Это происходит потому, что потоки в java используют кэш процессора для повышения производительности. Таким образом, каждый поток может иметь разные значения переменной, пока они не будут синхронизированы. Этот вопрос SO содержит дополнительную информацию по этому вопросу.

Также от этого ответа:

Это все о памяти. Потоки обмениваются данными через общую память, но когда в системе несколько процессоров, пытающихся получить доступ к одной и той же системе памяти, система становится узким местом. Поэтому процессорам в типичном многопроцессорном компьютере разрешается задерживать, переупорядочивать и кэшировать операции с памятью, чтобы ускорить процесс.

Это прекрасно работает, когда потоки не взаимодействуют друг с другом, но это вызывает проблемы, когда они действительно хотят взаимодействовать: если поток A сохраняет значение в обычной переменной, Java не гарантирует, когда (или даже если) поток B увидит изменение значения.

Для преодоления этой проблемы, когда это важно, Java предоставляет вам определенные средства синхронизации потоков. То есть получение потоков для согласования состояния памяти программы. Ключевое слово volatile и ключевое слово synchronized являются двумя способами установления синхронизации между потоками.

И, наконец, список futures не обновляется в вашем коде, поскольку основной поток постоянно занят из-за блока infinte while. Выполнение любой операции ввода-вывода в цикле while дает процессору достаточно места для обновления локального кэша.

Бесконечный цикл while, как правило, плохая идея, поскольку он очень ресурсоемкий. Добавление небольшой задержки перед следующей итерацией может сделать ее немного лучше (хотя все еще неэффективной).

...