Я хочу выполнять несколько различных задач параллельно, но у меня есть идея, что если задача уже поставлена в очередь или обрабатывается в настоящий момент, она не будет поставлена в очередь.Я немного прочитал о Java API и нашел код ниже, который, кажется, работает.Кто-нибудь может пролить свет на то, является ли метод, который я использую, лучшим подходом.Какие-нибудь опасности (безопасность потоков?) Или лучшие способы сделать это?Код выглядит следующим образом:
import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestExecution implements Runnable {
String key1;
String key2;
static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);
public static void main(String[] args) {
try {
execute(new TestExecution("A", "A"));
execute(new TestExecution("A", "A"));
execute(new TestExecution("B", "B"));
Thread.sleep(8000);
execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static boolean execute(TestExecution e) {
System.out.println("Handling "+e.key1+":"+e.key2);
if (executions.containsKey(e)) {
Future<?> f = (Future<?>) executions.get(e);
if (f.isDone()) {
System.out.println("Previous execution has completed");
executions.remove(e);
} else {
System.out.println("Previous execution still running");
return false;
}
}
else {
System.out.println("No previous execution");
}
Future<?> f = tpe.submit(e);
executions.put(e, f);
return true;
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Следуйте комментариям ниже:
План состоит в том, что запуск задач для выполнения будет выполняться cron, вызывающим веб-сервис RESTful.Например, ниже приведена настройка для одной задачи, запускаемой в 9:30 каждый день, плюс еще одной, запланированной каждые две минуты.
0/2 * * * * restclient.pl key11 key12
30 09 * * * restclient.pl key21 key22
В этом случае, если задание key11: key12 запущено или уже в очереди на выполнение, я не хочу ставить в очередь другой экземпляр.Я понимаю, что у нас есть другие варианты планирования, однако мы склонны использовать cron для других задач, поэтому я хочу сохранить это.
Второе обновление.В ответ на комментарии к настоящему моменту я переписал код, не могли бы вы прокомментировать какие-либо проблемы со следующим обновленным решением?
import java.util.concurrent.LinkedBlockingQueue;
public class TestExecution implements Runnable {
String key1;
String key2;
static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
try {
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("B", "B"));
Thread.sleep(8000);
tpe.execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPoolExecutor extends ThreadPoolExecutor {
Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());
public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {
super(2, 5, 1, TimeUnit.MINUTES, q);
}
public void execute(Runnable command) {
if (executions.contains(command)) {
System.out.println("Previous execution still running");
return;
}
else {
System.out.println("No previous execution");
}
super.execute(command);
executions.add(command);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
executions.remove(r);
}
}