Пул потоков, обрабатывающий «дублирующие» задачи - PullRequest
7 голосов
/ 18 января 2012

Я хочу выполнять несколько различных задач параллельно, но у меня есть идея, что если задача уже поставлена ​​в очередь или обрабатывается в настоящий момент, она не будет поставлена ​​в очередь.Я немного прочитал о Java API и нашел код ниже, который, кажется, работает.Кто-нибудь может пролить свет на то, является ли метод, который я использую, лучшим подходом.Какие-нибудь опасности (безопасность потоков?) Или лучшие способы сделать это?Код выглядит следующим образом:

import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecution implements Runnable {
   String key1;
   String key2;   
   static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
   static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
   static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);

   public static void main(String[] args) {
      try {
         execute(new TestExecution("A", "A"));
         execute(new TestExecution("A", "A"));
         execute(new TestExecution("B", "B"));
         Thread.sleep(8000);
         execute(new TestExecution("B", "B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   static boolean execute(TestExecution e) {
      System.out.println("Handling "+e.key1+":"+e.key2);
      if (executions.containsKey(e)) {
         Future<?> f = (Future<?>) executions.get(e);
         if (f.isDone()) {
            System.out.println("Previous execution has completed");
            executions.remove(e);
         } else {
            System.out.println("Previous execution still running");
            return false;
         }         
      }
      else {
         System.out.println("No previous execution");
      }
      Future<?> f = tpe.submit(e);
      executions.put(e, f);            
      return true;
   }

   public TestExecution(String key1, String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }              
}

Следуйте комментариям ниже:
План состоит в том, что запуск задач для выполнения будет выполняться cron, вызывающим веб-сервис RESTful.Например, ниже приведена настройка для одной задачи, запускаемой в 9:30 каждый день, плюс еще одной, запланированной каждые две минуты.

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22

В этом случае, если задание key11: key12 запущено или уже в очереди на выполнение, я не хочу ставить в очередь другой экземпляр.Я понимаю, что у нас есть другие варианты планирования, однако мы склонны использовать cron для других задач, поэтому я хочу сохранить это.

Второе обновление.В ответ на комментарии к настоящему моменту я переписал код, не могли бы вы прокомментировать какие-либо проблемы со следующим обновленным решением?

import java.util.concurrent.LinkedBlockingQueue;

public class TestExecution implements Runnable {
   String key1;
   String key2;      
   static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());

   public static void main(String[] args) {
      try {
         tpe.execute(new TestExecution("A", "A"));
         tpe.execute(new TestExecution("A", "A"));
         tpe.execute(new TestExecution("B", "B"));
         Thread.sleep(8000);
         tpe.execute(new TestExecution("B", "B"));
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }

   public TestExecution(String key1, String key2) {
      this.key1 = key1;
      this.key2 = key2;      
   }

   public boolean equals(Object obj)
   {
       if (obj instanceof TestExecution)
       {
          TestExecution t = (TestExecution) obj;
           return (key1.equals(t.key1) && key2.equals(t.key2));           
       }       
       return false;
   }

   public int hashCode ()
   {
      return key1.hashCode()+key2.hashCode();
   }

   public void run() {      
      try {
         System.out.println("Start processing "+key1+":"+key2);
         Thread.sleep(4000);
         System.out.println("Finish processing "+key1+":"+key2);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }      
   }
}


import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestThreadPoolExecutor extends ThreadPoolExecutor {
   Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());

   public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {      
      super(2, 5, 1, TimeUnit.MINUTES, q);      
   }

   public void execute(Runnable command) {
      if (executions.contains(command)) {
         System.out.println("Previous execution still running");
         return;
      }
      else {
         System.out.println("No previous execution");
      }
      super.execute(command);      
      executions.add(command);      
   }

   protected void afterExecute(Runnable r, Throwable t) {
      super.afterExecute(r, t);        
      executions.remove(r);
   }      
}

Ответы [ 2 ]

3 голосов
/ 18 января 2012

Вот как бы я справился и избежал дубликатов

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.*;

public class TestExecution implements Callable<Void> {
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>());

    private final String key1;
    private final String key2;

    public static void main(String... args) throws InterruptedException {
        new TestExecution("A", "A").execute();
        new TestExecution("A", "A").execute();
        new TestExecution("B", "B").execute();
        Thread.sleep(8000);
        new TestExecution("A", "A").execute();
        new TestExecution("B", "B").execute();
        new TestExecution("B", "B").execute();
        TPE.shutdown();
    }

    public TestExecution(String key1, String key2) {
        this.key1 = key1;
        this.key2 = key2;
    }

    void execute() {
        if (TE_SET.add(this)) {
            System.out.println("Handling " + this);
            TPE.submit(this);
        } else {
            System.out.println("... ignoring duplicate " + this);
        }
    }

    public boolean equals(Object obj) {
        return obj instanceof TestExecution &&
                key1.equals(((TestExecution) obj).key1) &&
                key2.equals(((TestExecution) obj).key2);
    }

    public int hashCode() {
        return key1.hashCode() * 31 + key2.hashCode();
    }

    @Override
    public Void call() throws InterruptedException {
        if (!TE_SET.remove(this)) {
            System.out.println("... dropping duplicate " + this);
            return null;
        }
        System.out.println("Start processing " + this);
        Thread.sleep(4000);
        System.out.println("Finish processing " + this);
        return null;
    }

    public String toString() {
        return key1 + ':' + key2;
    }
}

отпечатков

Handling A:A
... ignoring duplicate A:A
Handling B:B
Start processing A:A
Start processing B:B
Finish processing A:A
Finish processing B:B
Handling A:A
Handling B:B
Start processing A:A
Start processing B:B
... ignoring duplicate B:B
Finish processing B:B
Finish processing A:A
2 голосов
/ 18 января 2012

Пара комментариев:

  • в методе execute вы получите условие гонки между чтением «executeings» (containsKey) и записью (remove или put), если несколькопотоки вызывают этот метод одновременно.Вы должны обернуть все вызовы в «исполнения», которые должны быть атомарными в синхронизированном блоке.(В вашем случае будет работать синхронизированный метод) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • Вы должны обрабатывать состояние с помощью одноэлементных, а не статических (то есть глобальных) переменных

Но я бы очень хотелузнать немного больше о вашем дизайне, чтобы понять, чего вы пытаетесь достичь.Почему задача ставится в очередь на выполнение несколько раз?

...