Возвращаемые значения из потоков Java - PullRequest
18 голосов
/ 23 февраля 2010

У меня есть Java-тема, подобная следующей:

   public class MyThread extends Thread {
        MyService service;
        String id;
        public MyThread(String id) {
            this.id = node;
        }
        public void run() {
            User user = service.getUser(id)
        }
    }

У меня около 300 идентификаторов и каждые пару секунд - я запускаю потоки, чтобы сделать вызов для каждого идентификатора. Например.

for(String id: ids) {
    MyThread thread = new MyThread(id);
    thread.start();
}

Теперь я хотел бы собрать результаты из каждого потока и выполнить пакетную вставку в базу данных вместо 300 вставок базы данных каждые 2 секунды.

Есть идеи, как мне это сделать?

Ответы [ 9 ]

35 голосов
/ 23 февраля 2010

Канонический подход заключается в использовании Callable и ExecutorService. submit ting от Callable до ExecutorService возвращает (типобезопасное) Future, из которого вы можете get результат.

class TaskAsCallable implements Callable<Result> {
    @Override
    public Result call() {
        return a new Result() // this is where the work is done.
    }
}

ExecutorService executor = Executors.newFixedThreadPool(300);
Future<Result> task = executor.submit(new TaskAsCallable());
Result result = task.get(); // this blocks until result is ready

В вашем случае вы, вероятно, захотите использовать invokeAll, который возвращает List из Futures, или создать этот список самостоятельно, когда вы добавляете задачи исполнителю. Чтобы собрать результаты, просто наберите get для каждого.

18 голосов
/ 23 февраля 2010

Если вы хотите собрать все результаты перед обновлением базы данных, вы можете использовать метод invokeAll. Это обеспечивает учет, который потребуется, если вы отправляете задания по одному, как предлагает daveb .

private static final ExecutorService workers = Executors.newCachedThreadPool();

...

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>();
for (final String id : ids) {
  tasks.add(new Callable<User>()
  {

    public User call()
      throws Exception
    {
      return svc.getUser(id);
    }

  });
}
/* invokeAll blocks until all service requests complete, 
 * or a max of 10 seconds. */
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
for (Future<User> f : results) {
  User user = f.get();
  /* Add user to batch update. */
  ...
}
/* Commit batch. */
...
4 голосов
/ 23 февраля 2010

Сохраните ваш результат в вашем объекте. Когда он завершится, пусть он попадет в синхронизированную коллекцию (на ум приходит синхронизированная очередь).

Если вы хотите собрать результаты для отправки, возьмите все из очереди и прочитайте результаты из объектов. Вы могли бы даже заставить каждый объект знать, как «публиковать» свои собственные результаты в базе данных, таким образом, различные классы могут быть отправлены, и все они обрабатываются с помощью одного и того же крошечного, элегантного цикла.

В JDK есть множество инструментов, которые могут помочь с этим, но это действительно легко, когда вы начинаете думать о вашем потоке как об истинном объекте, а не просто как о кучке дерьма вокруг метода "run". Как только вы начинаете думать об объектах, программирование становится намного проще и приятнее.

3 голосов
/ 12 мая 2016

В Java8 есть лучший способ сделать это, используя CompletableFuture . Скажем, у нас есть класс, который получает идентификатор из базы данных, для простоты мы можем просто вернуть число, как показано ниже,

static class GenerateNumber implements Supplier<Integer>{

    private final int number;

    GenerateNumber(int number){
        this.number = number;
    }
    @Override
    public Integer get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return this.number;
    }
}

Теперь мы можем добавить результат в параллельную коллекцию, когда будут готовы результаты каждого будущего.

Collection<Integer> results = new ConcurrentLinkedQueue<>();
int tasks = 10;
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks];
for (int i = 0; i < tasks; i++) {
     int temp = i;
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor);
     allFutures[i] = future.thenAccept(results::add);
 }

Теперь мы можем добавить обратный вызов, когда все фьючерсы готовы,

CompletableFuture.allOf(allFutures).thenAccept(c->{
   System.out.println(results); // do something with result
});
1 голос
/ 23 февраля 2010

Вы можете создать класс, который расширяет Observable. Затем ваш поток может вызвать метод в классе Observable, который уведомит любые классы, зарегистрированные в этом наблюдателе, путем вызова Observable.notifyObservers (Object).

Класс наблюдения будет реализовывать Observer и регистрироваться в Observable. Затем вы реализуете метод обновления (Observable, Object), который вызывается при вызове Observerable.notifyObservers (Object).

1 голос
/ 23 февраля 2010
public class TopClass {
     List<User> users = new ArrayList<User>();
     void addUser(User user) {
         synchronized(users) {
             users.add(user);
         }
     }
     void store() throws SQLException {
        //storing code goes here
     }
     class MyThread extends Thread {
            MyService service;
            String id;
            public MyThread(String id) {
                this.id = node;
            }
            public void run() {
                User user = service.getUser(id)
                addUser(user);
            }
        }
}
1 голос
/ 23 февраля 2010

Самый простой подход - передать объект каждому потоку (один объект на поток), который будет содержать результат позже. Основной поток должен хранить ссылку на каждый объект результата. Когда все потоки объединены, вы можете использовать результаты.

1 голос
/ 23 февраля 2010

Вы можете создать очередь или список, который вы передаете созданным потокам, потоки добавляют свой результат в список, который очищается потребителем, который выполняет пакетную вставку.

1 голос
/ 23 февраля 2010

Вам нужно сохранить результат в чем-то вроде синглтона. Это должно быть правильно синхронизировано.

РЕДАКТИРОВАТЬ : я знаю, что это не самый лучший совет, так как не рекомендуется обрабатывать raw Threads. Но, учитывая вопрос, это сработает, не так ли? Возможно, меня не проголосуют, но почему голосование против?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...