Как реализовать PriorityBlockingQueue с ThreadPoolExecutor и пользовательские задачи - PullRequest
24 голосов
/ 23 августа 2010

Я много искал, но не смог найти решение своей проблемы.

У меня есть свой собственный класс BaseTask, который использует ThreadPoolExecutor для обработки задач.
Если я не хочу расставлять приоритеты (то есть, используя LinkedBlockingQueue), это работает просто отлично, нокогда я пытаюсь использовать PriorityBlockingQueue, я получаю ClassCastException, потому что ThreadPoolExecutor оборачивает мои Задачи в FutureTask объект.
Это, очевидно, нормально, потому что FutureTask не реализует Comparable, но какпродолжу ли я решать проблему приоритета?
Я читал, что вы можете переопределить newTaskFor в ThreadPoolExecutor, но я не могу найти этот метод вообще ...?

Буду очень признателен за любые предложения!

Какой-нибудь код, чтобы помочь:

В моем BaseTask классе у меня есть

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

В BaseFutureTask class

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

В BaseThreadPoolExecutor class i переопределяет 3 submit методы ...
Вызывается конструктор в этом классе, но ни один из submit методов

Ответы [ 6 ]

15 голосов
/ 30 марта 2011
public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

Как вы можете догадаться, RunWithPriority - это абстрактный класс, который является Runnable и имеет поле приоритета Integer

9 голосов
/ 16 мая 2013

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

И

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

И затем используйте его так:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
4 голосов
/ 12 декабря 2011

Мое решение:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}
2 голосов
/ 24 января 2016

Я попытаюсь объяснить эту проблему с помощью полнофункционального кода. Но прежде чем углубляться в код, я хотел бы объяснить, что такое PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue является реализацией BlockingQueue. Он принимает задачи вместе с их приоритетом и передает задачу с наивысшим приоритетом для выполнения в первую очередь. Если какие-либо две задачи имеют одинаковый приоритет, нам нужно предоставить некоторую настраиваемую логику, чтобы решить, какая задача идет первой.

Теперь давайте сразу перейдем к коду.

Класс драйвера : Этот класс создает исполнителя, который принимает задачи, а затем отправляет их на выполнение. Здесь мы создаем две задачи: одну с низким приоритетом, а другую с высоким приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потоков и использовать PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

MyTask class : MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача выполняется, она печатает сообщение, а затем переводит поток в спящий режим на 1 секунду.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class : Поскольку мы используем PriorityBlocingQueue для хранения наших задач, наши задачи должны быть заключены в FutureTask, а наша реализация FutureTask должна реализовывать Comparable интерфейс. Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет задачу с наивысшим приоритетом на выполнение.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Класс приоритета : Самоочевидный класс приоритета.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Теперь, когда мы запустим этот пример, мы получим следующий вывод

The following Runnable is getting executed High
The following Runnable is getting executed Low

Несмотря на то, что мы сначала отправили приоритет LOW, но позже задачу HIGH priority, но поскольку мы используем PriorityBlockingQueue, сначала будет выполняться задача с более высоким приоритетом.

0 голосов
/ 30 сентября 2012

Чтобы ответить на ваш вопрос: метод newTaskFor() находится в суперклассе ThreadPoolExecutor, AbstractExecutorService.Вы можете просто переопределить его в ThreadPoolExecutor, однако.

0 голосов
/ 23 августа 2010

Похоже, они оставили это вне гармонии апача. Примерно год назад svn commit log исправляет отсутствие newTaskFor. Возможно, вы можете просто переопределить функции submit в расширенном ThreadPoolExecutor, чтобы создать расширенный FutureTask, то есть Comparable. Они не очень длинные .

...