Как узнать Runnable запланировано для повторного выполнения - PullRequest
0 голосов
/ 09 марта 2019

Мне нужно запустить пять потоков для получения данных из API несколько раз каждые 20 секунд, поэтому я использовал ScheduledExecutorService.

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

for (int i = 0; i < 5; i++) {
    scheduler.scheduleWithFixedDelay(Constant.workerThread[i], 0, delay, TimeUnit.SECONDS);
}

Как узнать (каждый раз), когда выполняются пять потоков?

Ответы [ 4 ]

2 голосов
/ 09 марта 2019

Редактировать: кажется, что люди на самом деле не понимают фрагмент кода. Я сделаю это жирным шрифтом, чтобы никто не подошел ко мне снова, управлял внутренним ExecutorService внешним образом, а не внутри Callable лямбды, заботясь о том, чтобы надлежащим образом отключить его при необходимости .

То, что вы можете сделать, - это управлять одной запланированной задачей, и внутри нее выполнять пять рабочих.

final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
scheduler.scheduleWithFixedDelay(
        () -> {
            final ExecutorService e = Executors.newFixedThreadPool(5);
            final CompletionService<?> cs = new ExecutorCompletionService<>(e);

            for (int i = 0; i < 5; i++) {
                // Constant.workerThread[i] is a Runnable
                cs.submit(Constant.workerThread[i], null);
            }

            for (int i = 0; i < 5; i++) {
                try {
                    // Will block until a Future<?> result is available.
                    // Doesn't matter which one, it will take the first available.
                    cs.take();
                } catch (final InterruptedException ignored) {
                    // Ingore the exception, as we care only
                    // about if all off them finished (abruptly or not)
                }
            }

            // All of them finished!
            e.shutdown();
        }, 0, 20, TimeUnit.SECONDS));

JavaDoc для ExecutorCompletionService

A CompletionService, который использует поставляемый Executor для выполнения задачи. Этот класс организует, что представленные задачи, по завершении, помещается в очередь, доступную с помощью take.

JavaDoc для ExecutorCompletionService#take

Извлекает и удаляет Future, представляющий следующее завершенное задача, ожидающая, если ее еще нет.

Это должно быть уточнено, но вы должны понять.

1 голос
/ 09 марта 2019

Если вы можете изменить исходный код запланированных задач, вы можете реализовать что-то вроде этого:

public class ScheduleExample {

    private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    static abstract class RunnableWithNotification implements Runnable {

        @Override
        public final void run() {
            beforeRun();
            doRun();
            afterRun();
        }

        public abstract void doRun();

        public abstract void beforeRun();

        public abstract void afterRun();
    }

    public static void main(String... args) {

        long delay = 5;
        List<Runnable> tasks = Arrays.asList(
                newRunnableWithNotification(1),
                newRunnableWithNotification(2),
                newRunnableWithNotification(3),
                newRunnableWithNotification(4),
                newRunnableWithNotification(5));

        tasks.forEach(task -> scheduler.scheduleWithFixedDelay(task, 0, delay, TimeUnit.SECONDS));

    }

    private static Runnable newRunnableWithNotification(int i) {
        return new RunnableWithNotification() {
            @Override
            public void doRun() {
                System.out.println("Executing task " + i);
            }

            @Override
            public void beforeRun() {
                System.out.println("Before executing task " + i);
            }

            @Override
            public void afterRun() {
                System.out.println("After executed task " + i);
            }
        };
    }
}
1 голос
/ 09 марта 2019

Мы запускаем Runnable объекты, а не потоки

Потоки не «выполняются».

Вы должны передавать Runnable при вызове ScheduledExecutorService::scheduleAtFixdDelay.Меня беспокоит ваше имя Constant.workerThread.Вы не передаете потоки, вы передаете задачу для выполнения в каком-то потоке.Вам не нужно беспокоиться о том, какой поток выполняет задачу Runnable.Вам не нужно беспокоиться о нитях вообще.Обработка задачи, выполняемой в потоках, - это работа исполнителя, отсюда и название.

Похоже, вам не хватает основных понятий потоков и задач.Не удивительно, поскольку это сложная тема, когда только начинаешь.Я предлагаю изучить учебники Java по многопоточности и исполнителям, предоставляемые Oracle.com бесплатно.Затем выполните поиск в Интернете, чтобы узнать больше.В конце концов вам следует изучить превосходную книгу Брайана Гетца и др. Параллелизм Java на практике .

ScheduledFuture отслеживание завершения

Так что вы не будете следить запотоки.Вместо этого сосредоточьтесь на своих Runnable задачах.Чтобы отслеживать их состояние, захватите объект ScheduledFuture, возвращаемый по вашему вызову scheduleAtFixedDelay.В настоящее время вы игнорируете эти возвращенные объекты.

Этот запланированный будущий объект предлагает методы, чтобы увидеть, завершена ли задача или отменена.Вы также можете отменить задание.

0 голосов
/ 09 марта 2019

Вы можете настроить вид карты выполнения.
Всегда свежие данные о состоянии выполнения каждого запускаемого объекта.Это общий пример, поэтому вам нужно адаптировать его к вашим потребностям.

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Application { 
    // Your mocked runnables
    private static List<Runnable> workerRunnables = new ArrayList<>();
    // This will be the map with always updated values, get the map at[i]
    // will return if workerThread[i is running]
    private static Map<Integer, Boolean> executionMap = new ConcurrentHashMap<>();
    private static final int threadPoolSize = 5;

    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(threadPoolSize);

        for (int i = 0; i < threadPoolSize; i++) {
            int finalI = i;
            workerRunnables.add(() -> {
                try {
                    // Update the map, the runnable has started
                    executionMap.put(finalI, true);

                    // Simulating your API calls with different types of delay
                    Thread.sleep(3000);
                    if (finalI == 2) {
                        Thread.sleep(1000);
                    }

                    // Update the map, the runnable has finished
                    executionMap.put(finalI, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        workerRunnables.forEach(worker -> scheduler.scheduleWithFixedDelay(worker, 0, 2, TimeUnit.SECONDS));

        Executors.newCachedThreadPool().execute(new Runnable() {
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            @Override
            public void run() {
                scheduler.scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < threadPoolSize; i++) {
                            System.out.println("Runnable number " + i +" is running: " + executionMap.get(i));
                        }
                    }
                }, 0, 2, TimeUnit.SECONDS);
            }
        });
    }
}
...