Обработка исключений CompletableFuture runAsyn c & thenRun - PullRequest
2 голосов
/ 28 января 2020

Допустим, у меня есть этот пример кода, и внутри runAsync обнаружено исключение. Мой вопрос заключается в том, будет ли это исключение препятствовать выполнению thenRun, поскольку thenRun выполняется в том же потоке, что и метод вызывающего этого кода.

private void caller() {
    CompletableFuture.runAsync(() -> {
          try {
              // some code
          } catch (Exception e) {
              throw new CustomException(errorMessage, e);
          }
         }, anInstanceOfTaskExecutor).thenRun(
         // thenRun code
     ));
}

Я уже прошел this поток и объясняет, как вы можете обрабатывать исключения, выданные из асинхронных блоков (т.е. путем блокировки и использования join). Я хочу знать, будет ли выполняться код внутри блока thenRun или нет, если CompletableFuture completesExceptionally.

Обновить :

Я выполнил некоторый код для проверки this:

CompletableFuture.runAsync(() -> {
      List<Integer> integerList = new ArrayList<>();
      integerList.get(1);    // throws exception
    }).thenRun(() -> {
      System.out.println("No exception occurred");
    });

Это ничего не печатает, и это означает, что исключение не «распространялось до / до» потока метода вызывающей стороны из асинхронного блока. Теперь я понимаю ожидаемое поведение, но у меня есть следующие вопросы:

  1. Почему он молча терпит неудачу, даже если CompletableFuture завершается в исключительном порядке?
  2. Как это работает в фоновом режиме?
  3. Это потому, что оба этих потока (поток вызывающего и асинхронный поток) имеют свое собственное пространство стека?

Ответы [ 2 ]

4 голосов
/ 09 февраля 2020

Общая информация

Документация CompletionStage объясняет общие правила интерфейса:

Этап возможно асинхронного вычисления, которое выполняет действие или вычисляет значение, когда другой CompletionStage завершается. Этап завершается после завершения его вычисления, но это, в свою очередь, может инициировать другие зависимые этапы. Функциональность, определенная в этом интерфейсе, принимает только несколько базовых c форм, которые расширяются до большего набора методов для захвата диапазона стилей использования:

  • Вычисления, выполняемые Этап может быть выражен как Function, Consumer или Runnable (с использованием методов с именами, включая apply , accept или run соответственно ) в зависимости от того, требует ли он аргументов и / или дает ли он результаты. Например:

    stage.thenApply(x -> square(x))
         .thenAccept(x -> System.out.print(x))
         .thenRun(() -> System.out.println());
    

    Дополнительная форма ( compose ) позволяет создавать конвейеры вычислений из функций, возвращающих этапы завершения.

    Любым аргументом для вычисления этапа является результат вычисления запускающего этапа.

  • Выполнение одного этапа может быть инициировано завершением одного этапа, или обоих двух этапов, или любого из двух этапов. Зависимости на одном этапе упорядочены с использованием методов с префиксом , затем . Те, кто вызван завершением обоих двух этапов, могут комбинировать свои результаты или эффекты, используя соответственно названные методы. Триггеры, вызванные или двух этапов, не дают никаких гарантий относительно того, какие из результатов или эффектов используются для вычисления зависимого этапа.

  • Зависимости между этапами контролируют запуск вычисления, но в противном случае не гарантирует какой-либо конкретный порядок. Кроме того, выполнение вычислений нового этапа может быть организовано любым из трех способов: выполнение по умолчанию, асинхронное выполнение по умолчанию (с использованием методов с суффиксом asyn c, которые используют средство асинхронного выполнения по умолчанию для этапа) или пользовательское (через прилагаемый Executor). Свойства выполнения режимов по умолчанию и asyn c задаются реализациями CompletionStage, а не этим интерфейсом. Методы с явными Executor аргументами могут иметь произвольные свойства выполнения и могут даже не поддерживать параллельное выполнение, но организованы для обработки способом, учитывающим асинхронность.

  • Две формы методов (handle и whenComplete) поддерживают безусловные вычисления независимо от того, завершена ли стадия запуска нормально или исключительно. Метод exceptionally поддерживает вычисления только в исключительных случаях, когда этап запуска завершается вычислением результата замены, аналогично ключевому слову java [sic] catch. Во всех других случаях, если вычисление этапа внезапно завершается с (непроверенным) исключением или ошибкой, тогда все зависимые этапы, требующие его завершения, также завершаются исключительно, с CompletionException, содержащим исключение в качестве причины. Если этап зависит от обоих двух этапов, и оба этапа выполнены исключительно, то CompletionException может соответствовать одному из этих исключений. Если этап зависит от либо от двух других, и только один из них завершается в исключительном порядке, никаких гарантий относительно того, завершается ли зависимый этап нормально или исключительно, не делается. В случае метода whenComplete, когда само предоставленное действие встречает исключение, тогда этап завершается исключительно с этим исключением, если этап источника также не завершается исключительно, в этом случае исключительное завершение этапа источника получает предпочтение и распространяется на зависимый этап.

Все методы соответствуют указанным выше спецификациям запуска, выполнения и исключительного завершения (которые не повторяются в отдельных спецификациях метода). [...]

[...]

И документация CompletableFuture объясняет правила потоков (и другие политики), где, как указано выше, некоторые из них оставлены до реализации CompletionStage:

A Future, который может быть явно завершен (установка его значения и статуса) и может использоваться как CompletionStage, поддерживающий зависимые функции и действия, которые запускаются после его завершения.

Когда два или другие потоки пытаются complete, completeExceptionally или cancel a CompletableFuture, только один из них преуспевает.

В дополнение к этим и связанным с ними методам для непосредственного управления состоянием и результатами, CompletableFuture реализует интерфейс CompletionStage со следующими политиками:

  • Действия, выполняемые для зависимых завершений не асинхронных c методов, могут выполняться потоком, завершающим текущую CompletableFuture, или любым другим вызывающим методом завершения.

  • Все асинхронные c методы без явного аргумента Executor выполняются с использованием ForkJoinPool.commonPool() (если это не так. pport уровень параллелизма не менее двух, и в этом случае создается новый Thread для выполнения каждой задачи). Это может быть переопределено для нестатистических c методов в подклассах путем определения метода defaultExecutor(). Для упрощения мониторинга, отладки и отслеживания все сгенерированные асинхронные задачи являются экземплярами интерфейса маркера CompletableFuture.AsynchronousCompletionTask. Операции с задержками могут использовать методы адаптера, определенные в этом классе, например: supplyAsync(supplier, delayedExecutor(timeout, timeUnit)). Для поддержки методов с задержками и тайм-аутами этот класс поддерживает не более одного потока демона для запуска и отмены действий, а не для их запуска.

  • Все методы CompletionStage реализованы независимо от других опубликованных c методов, поэтому на поведение одного метода не влияют переопределения других в подклассах.

  • Все CompletionStage методы возвращают CompletableFuture с. Чтобы ограничить использование только теми методами, которые определены в интерфейсе CompletionStage, используйте метод minimalCompletionStage(). Или чтобы убедиться, что клиенты сами не изменят будущее, используйте метод copy().

CompletableFuture, также реализующий Future со следующими политиками:

  • Поскольку (в отличие от FutureTask) этот класс не имеет прямого контроля над вычислением, которое приводит к его завершению, отмена рассматривается как еще одна форма исключительного завершения. Метод cancel имеет тот же эффект, что и completeExceptionally(new CancellationException()). Метод isCompletedExceptionally() может использоваться для определения, завершено ли CompletableFuture каким-либо исключительным образом.

  • В случае исключительного завершения с CompletionException, методы get() и get(long, TimeUnit) бросить ExecutionException по той же причине, что и соответствующая CompletionException. Чтобы упростить использование в большинстве контекстов, этот класс также определяет методы join() и getNow(T), которые вместо этого выдают CompletionException непосредственно в этих случаях.

[...]


Ваши вопросы

Вот ваш пример кода:

CompletableFuture.runAsync(() -> {
      List<Integer> integerList = new ArrayList<>();
      integerList.get(1);    // throws exception
    }).thenRun(() -> {
      System.out.println("No exception occurred");
    });

Если вы не знаете, такие методы, как thenRun, возвращают new CompletionStage. Таким образом, ваш код подобен следующему:

CompletableFuture<Void> runAsyncStage = CompletableFuture.runAsync(() -> List.of().get(0));
CompletableFuture<Void> thenRunStage =
    runAsyncStage.thenRun(() -> System.out.println("thenRun executing!"));

thenRunStage вызывается завершением runAsyncStage, которое, в этом случае, гарантированно завершается исключительно с IndexOutOfBoundsException. Что касается того, почему Runnable не выполняется, это из-за контракта CompletionStage#thenRun(Runnable):

Возвращает новый CompletionStage, который, когда этот этап завершается нормально , выполняет данное действие. См. Документацию CompletionStage для правил, касающихся исключительного завершения.

В связи с тем, что этап запуска завершается исключительно, этап thenRunStage также завершается исключительно, что означает, что Runnable пропущен.

1. «Почему он молча терпит неудачу, даже если CompletableFuture завершает работу исключительно?»

Пример кода эквивалентен проглатыванию исключения с помощью блока try-catch. Вы не видите исключение, потому что вы не написали код, который бы сообщал об исключении. Обе стадии runAsyncStage и thenRunStage были выполнены исключительно, последняя из-за того, что первая завершилась исключительно.

Если вы хотите знать об исключении "в цепочке" этапов, тогда вы должны использовать этапы, такие как exceptionally[Async], handle[Async] и whenComplete[Async]. Делая это таким образом, вы можете изменить поведение цепочки на основе нормального или исключительного завершения этапа триггера.

Если вы хотите знать об исключении «вне цепочки» этапов, то вам необходимо используйте такие методы, как join(), get() и get(long,TimeUnit). Если этап завершился исключительно, тогда первый бросит CompletionException, обертывающий причину сбоя, в то время как последние два выбросят ExecutionException, обертывающий причину сбоя.

2. «Как это работает в фоновом режиме?»

Реализация CompletableFuture слишком сложна, чтобы объяснить ее в ответе переполнения стека. Если вы хотите изучить реализацию, вы можете посмотреть на исходный код. Ваш JDK должен был прийти с src.zip файлом, содержащим исходные файлы Java. Вы также можете посмотреть исходный код онлайн в репозиториях OpenJDK . Например, вот исходный код JDK 13 CompletableFuture:

https://hg.openjdk.java.net/jdk/jdk13/file/0368f3a073a9/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

3. «Это потому, что оба этих потока (поток вызывающего и асинхронный поток) имеют свое собственное пространство стека?»

Один поток не будет знать об исключении в другом потоке, если между двумя потоками не будет какой-то связи. Вызывающие методы, такие как join(), при необходимости передают исключение вызывающему потоку, который сгенерирует указанное исключение. Однако, как показывает ответ на ваш первый вопрос, это немного сложнее, чем это. Даже если поток выдает исключение в пределах одной стадии , вы не увидите трассировки стека или чего-либо подобного. Это связано с тем, что исключение составляет , перехвачено , а этап помечен как сбойный с этим исключением в качестве причины . Затем другой код должен явно извлекать и обрабатывать это исключение по мере необходимости.

Это ничем не отличается от использования ExecutorService и возвращаемых Future объектов. Задача может не сработать в фоновом режиме, но другой код не узнает об этом до тех пор, пока не будет запрошен Future.

От награды: «Я хочу понять детали взаимодействия потоков друг с другом . »

Я не уверен, что еще добавить. CompletionStage API - это абстракция «над» потоками. Вы просто указываете API, как вы хотите, чтобы выполнялась цепочка команд, включая пулы потоков, которые будут использоваться на каждом этапе, а реализация обрабатывает все межпотоковые коммуникации для вас. Тем не менее, каждый поток делает свое дело, просто API разработан, чтобы обеспечить более простой и реактивный способ взаимодействия между потоками. Если вас интересует как это реализовано, тогда я рекомендую изучить исходный код (ссылка выше).

2 голосов
/ 28 января 2020

Это будет зависеть от того, какой шаг вы добавляете exceptionally.

. В приведенном ниже случае он пропустит thenRun и напрямую выполнит исключительный блок.

CompletableFuture.runAsync(() -> { 
     //process and throw exception
     }, anInstanceOfTaskExecutor )
    .thenRun(() -> {})
    .exceptionally(exception -> {
      // do something, handle exception
    })
 ));

В этом case, он выполнит thenRun.

   CompletableFuture.runAsync(() -> { 
     //process and throw exception
     }, anInstanceOfTaskExecutor )
    .exceptionally(exception -> {
      // do something, handle exception
    })
    .thenRun(() -> {})
 ));

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...