Как правильно генерировать и обрабатывать исключения при использовании Flux из projectreactor - PullRequest
1 голос
/ 06 марта 2019

Я использую io.projectreactor 3 (ядро реактора 3.2.6.RELEASE), и я заметил некоторые несоответствия, касающиеся обработки ошибок.К сожалению, официальная документация не дает достаточно подробностей для решения моих проблем.

У меня есть следующие 4 фрагмента.В некоторых случаях исключение будет игнорироваться, а в других случаях оно будет добавлено.Как на самом деле генерировать и потреблять исключения?

Фрагмент 1

В этом случае исключение будет проигнорировано, и main () завершится без получения исключения.

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
        }).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Вывод:

DONE

Фрагмент 2

Аналогичен примеру выше, за исключением того, что мы не используем Flux.pushно флюс. просто.Main () получит исключение.

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.just(
                1
        ).doOnNext(e -> {
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Выход:

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

Фрагмент 3

Мы сообщаем об исключении, вызывая sink.error.Main () не получит исключение.

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            sink.error(new RuntimeException("HELLO WORLD"));
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Вывод:

1
2
DONE

Фрагмент 4

Мы генерируем исключение напрямую.Main () получит исключение.

import reactor.core.publisher.Flux;

class Scratch {
    public static void main(String[] args) throws Throwable {
        Flux.push(sink -> {
            sink.next(1);
            sink.next(2);
            throw new RuntimeException("HELLO WORLD");
        }).subscribe(System.out::println, e -> {
            throw new RuntimeException(e);
        });
        System.out.println("DONE");
    }
}

Выход

1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
    at Scratch.lambda$main$1(scratch_15.java:10)
...

Как правильно обрабатывать исключение при работе с реактивным ядром?Единственный надежный способ, по-видимому, вообще не использовать функцию обратного вызова с ошибкой, а вместо этого использовать объемный поток подписаться с помощью try / catch.Но в этом случае я всегда получаю UnsupportedOperationException вместо исходного исключения, а затем мне нужно использовать Exceptions.isErrorCallbackNotImplemented, чтобы проверить, происходит ли оно из реактивного, извлечь вложенное исключение и затем выбросить его.

Это можно сделатьконечно, но это должно быть сделано последовательно на каждом месте, где мы используем Flux, на который подписаны.Это не выглядит хорошо для меня.Что мне здесь не хватает?

1 Ответ

2 голосов
/ 07 марта 2019

Во всех ваших примерах проблема заключается в перебрасывании лямбды обработки ошибок .subscribe(...).

Если вы хотите, чтобы исключение было выброшено в основной блок, используйте block() вариантов.

Если вы хотите проверить, распространяется ли ошибка по всему конвейеру, используйте StepVerifier.create(pipeline).expectError(...).verify().

.subscribe в общем случае - получение уведомлений о состоянии «терминала», не предназначенное для восстановления или повторного выброса ошибок (для этого используйте onError* операторы восходящего направления).

Примеры на основе just, кажется, правильно распространяют исключение, потому что они не выполняют предоставленный пользователем код при подписке, поэтому во время subscribe(Consumer<Throwable>).

не выполняется попытка / отлов.

push, как и generate / create / defer и compose, выполняют определенную пользователем логику (Consumer<FluxSink>) при подписке. Они защищают от целых Consumer исключений и пытаются распространять это (как onError сигнал), а не напрямую выбрасывают.

Но если сбой в Consumer вызван при выполнении одного из методов sink, это может быть проблематично, если subscriber перебрасывает: мы вводим рекурсию, где вызов приемника вызывает приемник , Мы защищаем от этого бесконечного случая путем выхода при обнаружении рекурсивного слива раковины.

Именно поэтому примеры на основе push, которые вызывают ошибку после sink.next или в sink.error (примеры 1 и 3), не приводят к исключению в основном:

  1. Consumer применяется
  2. sink.next вызывается и следующий оператор создает исключение 1, или sink.error вызывается
  3. исключение 1 достигает subscribe и перебрасывается как исключение 2
  4. это короткое замыкание Consumer.apply, исключение 2 передается sink.error
  5. раковина уже вызывается, поэтому мы вырываемся, чтобы избежать бесконечной рекурсии
  6. исключение 2 никогда не видели

В примере 4, с другой стороны, мы больше не находимся в процессе вызова методов приемника, и исходное исключение сначала не достигает подписчика:

  1. Consumer применяется
  2. прямо генерирует исключение 1
  3. это короткое замыкание Consumer.apply и исключение 1 передается в sink.error
  4. который распространяется на подписку
  5. , который перебрасывает его как исключение 2
  6. исключение 2 видно в основном методе
...