Лучший способ обработки необработанных исключений в задачах / действиях ForkJoinPool - PullRequest
0 голосов
/ 02 февраля 2012

Как лучше обрабатывать исключения (необработанные) при использовании ForkJoinPool для отправки задач (RecursiveAction или RecursiveTask)?

ForkJoinPool принимает Thread.UncaughtExceptionHandler для обработки исключений, когда WorkerThread завершается внезапно (что в любом случае не находится под нашим контролем), но этот обработчик не используется, когда ForkJoinTask выдает исключение.В своей реализации я использую стандартный submit / invokeAll.

Вот мой сценарий:

У меня есть темаработает в бесконечном цикле чтения данных из сторонней системы.В этой теме я отправляю Задачи в ForkJoinPool

new Thread() {
      public void run() {
         while (true) {
             ForkJoinTask<Void> uselessReturn = 
                   ForkJoinPool.submit(RecursiveActionTask);
         }
      }
 }

Я использую RecursiveAction и в некоторых случаях RecursiveTask.Эти задачи передаются в FJPool с использованием метода submit().Я хочу иметь общий обработчик исключений, подобный UncaughtExceptionHandler, где, если Задача выдает непроверенное / неперехваченное исключение, я могу обработать исключение и повторно отправить задачу, если требуется.Обработка исключения также гарантирует, что поставленные в очередь задачи не будут отменены, если одна или несколько задач выдают исключение.

invokeAll() метод возвращает набор ForkJoinTasks, но эти задачи находятся в рекурсивном блоке (каждая задача вызывает метод compute() и может быть разделена далее [гипотетический сценарий])

class RecursiveActionTask extends RecursiveAction {

    public void compute() {
       if <task.size() <= ACCEPTABLE_SIZE) {
          processTask() // this might throw an checked/unchecked exception
       } else {
          RecursiveActionTask[] splitTasks = splitTasks(tasks)
          RecursiveActionTasks returnedTasks = invokeAll(splitTasks);
          // the below code never executes as invokeAll submits the tasks to the pool 
          // and the flow never comes to the code below.
          // I am looking for some handling like this
          for (RecusiveActionTask task : returnedTasks) {
             if (task.isDone()) {
                task.getException() // handle this exception
             }
          }
       }
    }

}

Я заметил, что при сбое 3-4 задач весь блок отправки очереди отбрасывается.В настоящее время я поставил try/catch вокруг ProcessTask, который лично мне не нравится.Я ищу более общие.

  1. Я также хочу знать обо всем списке задач, которые не удалось, чтобы я мог повторно отправить их
  2. Когда задачи выдают исключения, делаютпотоки вытесняются из пула (хотя мой анализ обнаружил, что они не [но не уверены])?
  3. Вызов метода get() в FutureTask с большей вероятностью поместит мой поток в последовательное состояние, поскольку он ожидает, пока задача не завершится.
  4. Я хочу знать статус задания, только если оно не выполнено.Мне все равно, когда он завершится (очевидно, не хочет ждать час спустя)

Есть идеи, как обрабатывать исключения в приведенном выше сценарии?

Ответы [ 2 ]

2 голосов
/ 13 апреля 2012

Это я показываю, мы решили, что в Акке:

/**
 * INTERNAL AKKA USAGE ONLY
 */
final class MailboxExecutionTask(mailbox: Mailbox) extends ForkJoinTask[Unit] {
  final override def setRawResult(u: Unit): Unit = ()
  final override def getRawResult(): Unit = ()
  final override def exec(): Boolean = try { mailbox.run; true } catch {
    case anything ⇒
      val t = Thread.currentThread
      t.getUncaughtExceptionHandler match {
        case null ⇒
        case some ⇒ some.uncaughtException(t, anything)
      }
      throw anything
  }
 }
0 голосов
/ 13 мая 2018

@ Rajendra, Вы все делаете правильно, за исключением того, что вы должны использовать ForkJoinPool execute () вместо submit ().

Таким образом, если Runnable дает сбой, он вызовет исключение рабочего, и оно будет перехвачено вашим UncaughtExceptionHandler.

Я не знаю, почему существует такое поведение, но оно будет работать!Я усвоил это сложным путем: (

Из кода Java 8:
submit использует AdaptedRunnableAction ().
execute использует RunnableExecuteAction () (см. Rethrow ,) ).

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}
...