Однопоточность задачи без постановки в очередь дальнейших запросов - PullRequest
9 голосов
/ 10 февраля 2011

У меня есть требование, чтобы задача выполнялась асинхронно при отбрасывании любых дальнейших запросов, пока задача не будет завершена.

Синхронизация метода просто ставит задачи в очередь и не пропускает.Сначала я думал использовать SingleThreadExecutor, но он также ставит задачи в очередь.Затем я посмотрел на ThreadPoolExecutor, но он читает очередь, чтобы получить задачу, которая должна быть выполнена, и, следовательно, будет иметь одну выполняемую задачу и минимум одну задачу в очереди (другие могут быть удалены с помощью ThreadPoolExecutor.DiscardPolicy).

Единственное, о чем я могу подумать, - это использовать семафор для блокировки очереди.Я пришел со следующим примером, чтобы показать, чего я пытаюсь достичь.Есть ли более простой способ?Я что-то упустил очевидное?

import java.util.concurrent.*;

public class ThreadPoolTester {
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static Semaphore processEntry = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        if (!processEntry.tryAcquire()) return;
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        try {
                            System.out.println("start " + index);
                            Thread.sleep(1000); // pretend to do work
                            System.out.println("stop " + index);
                            return null;

                        } finally {
                            processEntry.release();
                        }
                    }
                }
            );
    }
}

Пример вывода

start 0
stop 0
start 5
stop 5
start 10
stop 10
start 15
stop 15

Принятие ответа axtavt и преобразование приведенного выше примера дает следующее более простое решение.

import java.util.concurrent.*;

public class SyncQueueTester {
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
            1000, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            kickOffEntry(i);

            Thread.sleep(200);
        }

        executor.shutdown();
    }

    private static void kickOffEntry(final int index) {
        executor.
            submit(
                new Callable<Void>() {
                    public Void call() throws InterruptedException {
                        System.out.println("start " + index);
                        Thread.sleep(1000); // pretend to do work
                        System.out.println("stop " + index);
                        return null;
                    }
                }
            );
    }
}

Ответы [ 2 ]

11 голосов
/ 10 февраля 2011

Похоже, что исполнитель, поддерживаемый SynchronousQueue с желаемой политикой, делает то, что вы хотите:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(),
    new ThreadPoolExecutor.DiscardPolicy());
0 голосов
/ 10 февраля 2011

если нет очереди, нет необходимости в исполнителе, я бы сказал. использование одного семафора кажется достаточно. Я использую код ниже, чтобы избежать запуска того же кода, когда он уже запущен. просто убедитесь, что semaphore равен static volatile, что делает семафор единственным семафором для класса и передает ссылку на семафор в кучу других потоков, как только он изменяется

if (this.getSemaphore().tryAcquire()) {
        try {
            process();
        } catch (Exception e) {
        } finally {
            this.getSemaphore().release();
        }
}
else {
    logger.info(">>>>> Job already running, skipping go");
}
...