Как остановить Runnable, запланированный для повторного выполнения после определенного количества выполнений - PullRequest
56 голосов
/ 01 сентября 2011

Ситуация

У меня есть Runnable.У меня есть класс, который планирует этот Runnable для выполнения с использованием ScheduledExecutorService с scheduleWithFixedDelay .

Цель

Я хочу изменить этот класс, чтобы запланировать Runnable для выполнения с фиксированной задержкой либо на неопределенный срок, или до тех пор, пока он не будет выполнен определенное количество раз, в зависимости от какого-либо параметра, передаваемого в конструктор.

Если возможно, я бы хотелиспользовать тот же Runnable, поскольку концептуально это то же самое, что и «run».

Возможные подходы

Подход # 1

Имеют два Runnable, один из которых отменяетрасписание после ряда выполнений (которых он хранит в счетчике) и без:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

Я бы предпочел просто один Runnable для выполнения метода doSomething.Привязка планирования к Runnable кажется неправильной.Что вы думаете по этому поводу?

Подход № 2

Есть один Runnable для выполнения кода, который мы хотим периодически запускать.Иметь отдельный запланированный runnable, который проверяет, сколько раз запускался первый Runnable, и отменяет, когда он достигает определенной суммы.Это может быть неточным, так как это будет асинхронным.Это кажется немного громоздким.Что вы думаете об этом?

Подход № 3

Расширьте ScheduledExecutorService и добавьте метод scheduleWithFixedDelayNTimes.Возможно, такой класс уже существует?В настоящее время я использую Executors.newSingleThreadScheduledExecutor();, чтобы получить свой экземпляр ScheduledExecutorService.Я бы, вероятно, должен был реализовать аналогичную функциональность для создания экземпляра расширенного ScheduledExecutorService.Это может быть сложно.Что вы думаете об этом?

Нет подхода к планировщику [Правка]

Я не смог использовать планировщик.Вместо этого я мог бы получить что-то вроде:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

И запустить это в каком-то потоке.Что вы думаете об этом?Вы потенциально можете по-прежнему использовать runnable и вызывать метод run напрямую.


Любые предложения приветствуются.Я ищу дебаты, чтобы найти «лучший метод» для достижения моей цели.

Ответы [ 7 ]

61 голосов
/ 04 сентября 2011

Вы можете использовать метод cancel () в Future. Из javadocs scheduleAtFixedRate

Otherwise, the task will only terminate via cancellation or termination of the executor

Вот пример кода, который оборачивает Runnable в другой, который отслеживает число раз, когда был выполнен оригинал, и отменяется после выполнения N раз.

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}
8 голосов
/ 01 сентября 2011

Цитируется из описания API (ScheduledExecutorService.scheduleWithFixedDelay):

Создает и выполняет периодическое действие, которое становится активным сначала после заданной начальной задержки, а затем с заданнымзадержка между прекращением одного исполнения и началом следующего. Если при выполнении какого-либо задания возникает исключение, последующее выполнение будет подавлено. В противном случае задание будет прервано только при отмене или прекращении исполнителя.

Итак, самая простая вещьбудет «просто выбросить исключение» (даже если это считается плохой практикой):

static class MyTask implements Runnable {

    private int runs = 0;

    @Override
    public void run() {
        System.out.println(runs);
        if (++runs >= 20)
            throw new RuntimeException();
    }
}

public static void main(String[] args) {
    ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
    s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS);
}
5 голосов
/ 11 сентября 2011

Пока что решение sbridges кажется наиболее чистым, за исключением того, что вы упомянули, что оно возлагает ответственность за обработку количества выполнений на Runnable.Это не должно быть связано с этим, вместо этого повторения должны быть параметром класса, обрабатывающего планирование.Чтобы достичь этого, я бы предложил следующий дизайн, который вводит новый класс executor для Runnables.Класс предоставляет два открытых метода для планирования задач, которые являются стандартными Runnables, с конечным или бесконечным повторением.При желании то же самое Runnable может быть передано для конечного и бесконечного планирования (что невозможно со всеми предлагаемыми решениями, которые расширяют класс Runnable для обеспечения конечных повторений).Обработка отмены конечных повторений полностью инкапсулирована в классе планировщика:

class MaxNScheduler
{

  public enum ScheduleType 
  {
     FixedRate, FixedDelay
  }

  private ScheduledExecutorService executorService =
     Executors.newSingleThreadScheduledExecutor();

  public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, 
    long initialDelay, long period, TimeUnit unit)
  {
    return scheduleNTimes(task, -1, type, initialDelay, period, unit);
  }

  /** schedule with count repetitions */
  public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, 
    ScheduleType type, long initialDelay, long period, TimeUnit unit) 
  {
    RunnableWrapper wrapper = new RunnableWrapper(task, repetitions);
    ScheduledFuture<?> future;
    if(type == ScheduleType.FixedDelay)
      future = executorService.scheduleWithFixedDelay(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    else
      future = executorService.scheduleAtFixedRate(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    synchronized(wrapper)
    {
       wrapper.self = future;
       wrapper.notify(); // notify wrapper that it nows about it's future (pun intended)
    }
    return future;
  }

  private static class RunnableWrapper implements Runnable 
  {
    private final Runnable realRunnable;
    private int repetitions = -1;
    ScheduledFuture<?> self = null;

    RunnableWrapper(Runnable realRunnable, int repetitions) 
    {
      this.realRunnable = realRunnable;
      this.repetitions = repetitions;
    }

    private boolean isInfinite() { return repetitions < 0; }
    private boolean isFinished() { return repetitions == 0; }

    @Override
    public void run()
    {
      if(!isFinished()) // guard for calls to run when it should be cancelled already
      {
        realRunnable.run();

        if(!isInfinite())
        {
          repetitions--;
          if(isFinished())
          {
            synchronized(this) // need to wait until self is actually set
            {
              if(self == null)
              {
                 try { wait(); } catch(Exception e) { /* should not happen... */ }
              }
              self.cancel(false); // cancel gracefully (not throwing InterruptedException)
            }
          }
        }
      }
    }
  }

}

Чтобы быть справедливым, логика управления повторениями все еще с a Runnable, но это 'a Runnable полностью внутренний по отношению к MaxNScheduler, в то время как задача Runnable, переданная для планирования, не должна заботиться о природе планирования.Также эта проблема может быть легко перенесена в планировщик при желании, обеспечивая некоторый обратный вызов каждый раз, когда RunnableWrapper.run выполняется.Это немного усложнит код и приведет к необходимости сохранения некоторой карты RunnableWrapper s и соответствующих повторений, поэтому я решил оставить счетчики в классе RunnableWrapper.

Я также добавилнекоторая синхронизация на обертке при настройке самостоятельно.Это необходимо, поскольку теоретически, когда выполнение завершается, возможно, что self еще не было назначено (вполне теоретический сценарий, но возможен только 1 повтор).

Отмена обрабатывается изящно, без выброса InterruptedExceptionи если до выполнения отмены запланирован еще один раунд, RunnableWrapper не вызовет базовый Runnable.

1 голос
/ 11 июля 2019

Для случаев использования, таких как опрос до определенного времени ожидания, мы можем подойти с более простым решением, используя Future.get().

/* Define task */
public class Poll implements Runnable {
    @Override
    public void run() {
        // Polling logic
    }
}

/* Create executor service */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/* Schedule task - poll every 500ms */
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS);

/* Wait till 60 sec timeout */
try {
    future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    scheduledFuture.cancel(false);
    // Take action on timeout
}
1 голос
/ 04 сентября 2011

Вот мое предложение (я считаю, что оно обрабатывает все случаи, упомянутые в вопросе):

public class RepeatedScheduled implements Runnable {

    private int repeatCounter = -1;
    private boolean infinite;

    private ScheduledExecutorService ses;
    private long initialDelay;
    private long delay;
    private TimeUnit unit;

    private final Runnable command;
    private Future<?> control;

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit) {

        this.ses = ses;
        this.initialDelay = initialDelay;
        this.delay = delay;
        this.unit = unit;

        this.command = command;
        this.infinite = true;

    }

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit, int maxExecutions) {

        this(ses, command, initialDelay, delay, unit);
        this.repeatCounter = maxExecutions;
        this.infinite = false;

    }

    public Future<?> submit() {

        // We submit this, not the received command
        this.control = this.ses.scheduleWithFixedDelay(this,
            this.initialDelay, this.delay, this.unit);

        return this.control;

    }

    @Override
    public synchronized void run() {

        if ( !this.infinite ) {
            if ( this.repeatCounter > 0 ) {
                this.command.run();
                this.repeatCounter--;
            } else {
                this.control.cancel(false);
            }
        } else {
            this.command.run();
        }

    }

}

Кроме того, оно позволяет внешней стороне остановить все из Future, возвращенного submit() method.

Использование:

Runnable MyRunnable = ...;
// Repeat 20 times
RepeatedScheduled rs = new RepeatedScheduled(
    MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20);
Future<?> MyControl = rs.submit();
...
1 голос
/ 01 сентября 2011

Ваш первый подход кажется нормальным.Вы могли бы объединить оба типа запускаемых объектов, передав объект mode его конструктору (или передать -1 в качестве максимального числа раз, которое он должен запускаться), и использовать этот режим, чтобы определить, нужно ли отменять выполняемый объект:

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

Вам придется передать запланированное будущее вашей задаче, чтобы оно само отменилось, или вы можете вызвать исключение.

0 голосов
/ 15 марта 2019

Я искал точно такую ​​же функциональность и выбрал org.springframework.scheduling.Trigger.

Ниже приведен рабочий пример полного теста (извините, если в коде слишком много данных) applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>

JAVA

package com.alz.springTests.schedulerTest;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ScheduledTest {

    private static ApplicationContext applicationContext;
    private static TaskScheduler taskScheduler;

    private static final class SelfCancelableTask implements Runnable, Trigger {
        Date creationTime = new Date();
        AtomicInteger counter = new AtomicInteger(0);
        private volatile boolean shouldStop = false;
        private int repeatInterval = 3; //seconds

        @Override
        public void run() {
            log("task: run started");

            // simulate "doing job" started
            int sleepTimeMs = ThreadLocalRandom.current().nextInt(500, 2000+1);
            log("will sleep " + sleepTimeMs + " ms");
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // "doing job" finished

            int i = counter.incrementAndGet();
            if (i > 5) { //cancel myself
                logErr("Attempts exceeded, will mark as shouldStop");
                shouldStop = true;

            } else {
                log("task: executing cycle #"+i);
            }
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            log("nextExecutionTime: triggerContext.lastActualExecutionTime() " + triggerContext.lastActualExecutionTime());
            log("nextExecutionTime: triggerContext.lastCompletionTime() " + triggerContext.lastCompletionTime());
            log("nextExecutionTime: triggerContext.lastScheduledExecutionTime() " + triggerContext.lastScheduledExecutionTime());

            if (shouldStop) 
                return null;

            if (triggerContext.lastCompletionTime() == null) {
                LocalDateTime ldt = creationTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());
            } else {
                LocalDateTime ldt = triggerContext.lastCompletionTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());               
            }

        }

    }

    private static void log(String log) {
        System.out.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    private static void logErr(String log) {
        System.err.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    public static void main(String[] args) {

        log("main: Stated...");

        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

        taskScheduler = (TaskScheduler) applicationContext.getBean("blockingTasksScheduler");

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ((ThreadPoolTaskScheduler)taskScheduler).getScheduledThreadPoolExecutor();

        SelfCancelableTask selfCancelableTask = new SelfCancelableTask();
        taskScheduler.schedule(selfCancelableTask, selfCancelableTask);


        int waitAttempts = 0;
        while (waitAttempts < 30) {
            log("scheduledPool pending tasks: " + scheduledThreadPoolExecutor.getQueue().size());

            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            waitAttempts++;

        }

        log("main: Done!");


    }

}
...