ThreadpoolExecutor с возможностью повторных попыток и выключения после сбоя задачи несколько раз - PullRequest
0 голосов
/ 14 мая 2019

Мне нужен исполнитель пула потоков, который должен выполнить точное количество (одинаковых) задач.

Он должен иметь возможность повторно отправлять невыполненные задачи n раз.Если какая-либо из задач завершится неудачей более чем на n, то пул потоков должен завершиться и не продолжать обрабатывать другие задачи.

Я попытался объединить 2 подхода, которые я нашел в разных ответах - один для повторного-отправка невыполненных задач путем переопределения ThreadPoolExecutor.afterExecute и создания подкласса CountDownLatch так, чтобы потоки, ожидающие на защелке, прерывались, а исполнитель выключался.

Пока это подклассовая защелка обратного отсчета:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;

public AbortableCountDownLatch(int count) {
    super(count);
}

/**
 * Unblocks all threads waiting on this latch and cause them to receive an
 * AbortedException.  If the latch has already counted all the way down,
 * this method does nothing.
 */
public void abort() {
    if( getCount() == 0 )
        return;

    this.aborted = true;
    while(getCount() > 0)
        countDown();
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    final boolean rtrn = super.await(timeout,unit);
    if (aborted)
        throw new AbortedException();
    return rtrn;
}

@Override
public void await() throws InterruptedException {
    super.await();
    if (aborted)
        throw new AbortedException();
}


public static class AbortedException extends InterruptedException {
    public AbortedException() {
    }

    public AbortedException(String detailMessage) {
        super(detailMessage);
    }
}
}

И исполнитель пула потоков:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

private static final int RETRY_LIMIT = 3;

private Map<Runnable, Integer> retriedTasks = new ConcurrentHashMap<>();
private AbortableCountDownLatch latch;

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue, AbortableCountDownLatch latch) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.latch = latch;
}

@Override
public void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    // If submit() method is called instead of execute()
    if (t == null && r instanceof Future<?>) {
        try {
            Object result = ((Future<?>) r).get();
        } catch (CancellationException e) {
            t = e;
        } catch (ExecutionException e) {
            t = e.getCause();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    if (t != null) {
        retriedTasks.put(r, retriedTasks.getOrDefault(r, 0) + 1);
        System.out.println("Retries for " + r + " -> " + retriedTasks.get(r));
        /* check to see if we have retried this task too many times, if so - shutdown */
        if (retriedTasks.containsKey(r) && retriedTasks.get(r) > RETRY_LIMIT) {
            System.err.println("Thread failed for more than " + RETRY_LIMIT + " times, aborting everything..");
            this.latch.abort();
        } else {
            System.err.println("Thread threw  exception " + t.getMessage() + ". Retry-ing task...");
            execute(r);
        }
    } else {
        /* clear any previous retry count for this runnable */
        retriedTasks.remove(r);
    }
}
}

И главный будет использовать их так:

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MainProcessor {

public static void main(String[] args) {

    AbortableCountDownLatch latch = new AbortableCountDownLatch(5);
    ThreadPoolExecutor threadPoolExecutor = new MyThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), latch);

    for (int i = 0; i < 5; i++) {
        threadPoolExecutor.submit(() -> {
            System.out.println("Started thread " + Thread.currentThread().getName());
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(7000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
                if (random.nextBoolean()){
                    System.err.println("Thread " + Thread.currentThread().getName() + " failed - throwing exception..");
                    throw new RuntimeException("Thread " + Thread.currentThread().getName() + "failed! spectacularly :!");
                }
                else {
                    System.out.println("Thread " + Thread.currentThread().getName() + " finished.");
                    latch.countDown();
                }
        });
    }

    try {
        latch.await();
    } catch (InterruptedException e) {
        threadPoolExecutor.shutdownNow();
    }

    threadPoolExecutor.shutdown();
}
}

Выглядит ли этот подход правильным?Мне не особо нравится, что защелка должна передаваться как исполнителю пула потоков, так и реальному Runnable.Есть ли стандартный способ достижения этого?Я тоже в порядке с версией Scala.

Я видел других, которые предлагали, чтобы задачи снова отправлялись в пул в случае сбоя, но это не кажется хорошей идеей, поскольку задача должна отвечать только за реальную логику выполнения., а не детали исполнения.

1 Ответ

3 голосов
/ 14 мая 2019

Вы можете использовать Task-Wrapper, который делает эту работу, тогда это будет довольно просто:

public class TaskWrapper implements Runnable
{
    private Runnable task;
    private int maxResubmits;
    private ThreadPoolExecutor executor;
    private CountDownLatch latch;

    public TaskWrapper(Runnable task, int maxResubmits, ThreadPoolExecutor executor, CountDownLatch latch) {
        this.task=task;
        this.maxResubmits=maxResubmits;
        this.executor=executor;
        this.latch=latch;
        executor.submit(this);
    }

    public void run() {
        try {
            task.run();
            latch.countdoun();
        }
        catch(Exception e) {
            maxResubmits--;
            if(maxResubmits>0)
                executor.submit(this);
            else
            {
                latch.countdoun();
                executor.shutdownNow()
            }                
        }
    }
}

Теперь вам нужно только создать защелку, вызвать ваши задачи и затем дождаться выполнения:


List<Runnable> tasks;
int maxResubmits;

CountDownLatch latch=new CountDownLatch(tasks.size());

tasks.forEach(task->new TaskWrapper(task,maxResubmits,executor,latch));

latch.await();

if(!executor.isShutdown())
    executor.shutdown();
...