Хотя это старый вопрос, я хотел опубликовать свой ответ, так как недавно искал только это решение. Можно использовать DelayQueue за ThreadPoolExecutor, просто нужно немного обернуть код. Хитрость заключается в том, чтобы заставить DelayQueue представить себя как BlockingQueue.
Я начал с определения интерфейса DR
, который расширяет как Runnable
, так и Delayed
. Обратите внимание, что статические методы здесь создают экземпляры DR (класс Instance не показан).
public interface DR extends Delayed, Runnable {
public static DR make( Runnable r )
{
if (r instanceof DR)
{
return (DR)r;
}
Impl impl = new Impl(r);
if (r instanceof Delayed)
{
impl.expires = ((Delayed) r).getDelay( TimeUnit.MILLISECONDS );
}
return impl;
}
public static DR make( Runnable r, long expires )
{
if (r instanceof DR)
{
if (expires == ((DR)r).getDelay( TimeUnit.MILLISECONDS ))
{
return (DR)r;
}
}
return new Impl(r, expires);
}
}
Реализации должны переопределять: public int compareTo(Delayed o)
, public boolean equals( Object o )
и public int hashCode()
.
Создать класс, расширяющий DelayQueue. Этот класс добавляет единственный метод, который представляет DelayQuue как BlockingQueue. Возвращаемый класс просто оборачивает DelayQueue и использует методы make
интерфейса DR
для преобразования из Runnable
в DR
, где это необходимо.
public class DelayedBlockingQueue extends DelayQueue<DR> {
public BlockingQueue<Runnable> asRunnableQueue() {
return new BlockingQueue<Runnable>(){
DelayedBlockingQueue dbq = DelayedBlockingQueue.this;
public boolean add(Runnable e) {
return dbq.add( DR.make( e ));
}
private List<DR> makeList( Collection<? extends Runnable> coll)
{
return coll.stream().map( r -> DR.make( r ) ).collect( Collectors.toList() ) ;
}
public boolean addAll(Collection<? extends Runnable> arg0) {
return dbq.addAll(makeList( arg0 ) );
}
public void clear() {
dbq.clear();
}
public boolean contains(Object o) {
if (o instanceof Runnable) {
return dbq.contains( DR.make( (Runnable)o ) );
}
return false;
}
public boolean containsAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
else {
return false;
}
}
return dbq.containsAll( lst );
}
public int drainTo(Collection<? super Runnable> c, int maxElements) {
return dbq.drainTo( c, maxElements );
}
public int drainTo(Collection<? super Runnable> c) {
return dbq.drainTo( c );
}
public Runnable element() {
return dbq.element();
}
public void forEach(Consumer<? super Runnable> arg0) {
dbq.forEach( arg0 );
}
public boolean isEmpty() {
return dbq.isEmpty();
}
public Iterator<Runnable> iterator() {
return WrappedIterator.create( dbq.iterator() ).mapWith( dr -> (Runnable)dr );
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
return dbq.offer( DR.make( e ), timeout, unit );
}
public boolean offer(Runnable e) {
return dbq.offer( DR.make( e ) );
}
public Runnable peek() {
return dbq.peek();
}
public Runnable poll() {
return dbq.poll();
}
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dbq.poll( timeout, unit );
}
public void put(Runnable e) throws InterruptedException {
dbq.put( DR.make(e) );
}
public int remainingCapacity() {
return dbq.remainingCapacity();
}
public Runnable remove() {
return dbq.remove();
}
public boolean remove(Object o) {
if (o instanceof Runnable)
{
return dbq.remove( DR.make( (Runnable)o) );
}
return false;
}
public boolean removeAll(Collection<?> arg0) {
List<DR> lst = new ArrayList<DR>();
for (Object o : arg0)
{
if (o instanceof Runnable)
{
lst.add( DR.make( (Runnable)o ) );
}
}
return dbq.removeAll( lst );
}
public boolean retainAll(Collection<?> arg0) {
return dbq.retainAll( arg0 );
}
public int size() {
return dbq.size();
}
public Runnable take() throws InterruptedException {
return dbq.take();
}
public Object[] toArray() {
return dbq.toArray();
}
public <T> T[] toArray(T[] arg0) {
return dbq.toArray( arg0 );
}
};
}
Чтобы использовать решение, создайте DelayedBlockingQueue
и используйте метод asRunnableQueue()
, чтобы передать работоспособную очередь в конструктор ThreadPoolExecutor.
DelayedBlockingQueue queue = new DelayedBlockingQueue();
ThreadPoolExecutor execService = new ThreadPoolExecutor( 1, 5, 30, TimeUnit.SECONDS, queue.asRunnableQueue() );