У меня есть следующий фрагмент кода, который в основном просматривает список задач, которые должны быть выполнены, и каждая задача затем передается исполнителю для выполнения.
, в свою очередь, создает другого исполнителя (для выполнения операций с базами данных ... чтения и записи данных в очередь) и выполняет задачу.
возвращает Future<Boolean>
для представленных задач. Когда одна из задач не выполняется, я хочу изящно прервать все потоки и завершить работу исполнителя, перехватывая все исключения. Какие изменения мне нужно сделать?
public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();
ThreadPoolExecutor threadPoolExecutor = null ;
private List<Source> sources = new ArrayList<Source>();
private static class IDGenerator extends ThreadLocal<Integer> {
public Integer get() {
return uniqueId.incrementAndGet();
public void init(){
// load sources list
public boolean execute() {
boolean succcess = true ;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DataMigration-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
for (Source source : sources) {
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
for (Future<Boolean> jobDone : result) {
try {
if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
// in case of successful DbWriterClass, we don't need to change
// it.
success = false;
} catch (Exception ex) {
// handle exceptions
public class JobExecutor implements Callable<Boolean> {
private ThreadPoolExecutor threadPoolExecutor ;
Source jobSource ;
public SourceJobExecutor(Source source) {
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Job Executor-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
public Boolean call() throws Exception {
boolean status = true ;
System.out.println("Starting Job = " + jobSource.getName());
try {
// do the specified task ;
}catch (InterruptedException intrEx) {
logger.warn("InterruptedException", intrEx);
status = false ;
} catch(Exception e) {
logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
status = false ;
System.out.println("Ending Job = " + jobSource.getName());
return status ;