Почему метод doOnCancel блокирует одноименный doOnCancel, даже если они находятся на разных планировщиках? - PullRequest
0 голосов
/ 15 апреля 2020

Я играл с отменами в проекте-реакторе и столкнулся с таким странным поведением: когда в одной цепочке издателей есть два родственных doOnCancel метода, и они отменяют в разных планировщиках, первый запускается (что всегда кажется быть последним) блокирует работающий позже. Может ли кто-нибудь объяснить это поведение?

Например, когда я запускаю этот блок кода

        Disposable disposable = Mono.fromCallable(() -> "A")
                .doOnCancel(() -> log.info("Cancel A"))
                .cancelOn(Schedulers.newElastic("Can-Sched-A"))
                .doOnCancel(() -> {
                    log.info("Cancel B");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                })
                .cancelOn(Schedulers.newElastic("Can-Sched-B"))
                .delayElement(Duration.ofSeconds(1))
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(System.out::println);

        Thread.sleep(100);
        disposable.dispose();
        Thread.sleep(5000);

Мы получаем следующий вывод, в котором мы видим, как журнал Cancel A появляется через две секунды после Cancel B:

2020-04-15 11:16:26 [Can-Sched-B-4] INFO  somePackage.SomeTestClass - Cancel B
2020-04-15 11:16:28 [Can-Sched-A-5] INFO  somePackage.SomeTestClass - Cancel A

1 Ответ

0 голосов
/ 16 апреля 2020

Это ожидаемое поведение. Здесь нужно понять одну важную вещь: все операции объединены в цепочку, и каждый оператор в конвейере по сути является подписчиком предыдущего оператора, который действует как издатель.

    Disposable disposable = Mono.fromCallable(() -> "A")
            .doOnCancel(() -> log.info("Cancel A"))              -- 5
            .cancelOn(Schedulers.newElastic("Can-Sched-A"))      -- 4
            .doOnCancel(() -> {                                  -- 3
                log.info("Cancel B");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
            .cancelOn(Schedulers.newElastic("Can-Sched-B"))       -- 2
            .delayElement(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(System.out::println);                      -- 1
  • Последний оператор подписки это то, что заставляет топ-издателя испускать элемент. Подписка пересылает подписку к предыдущему восходящему каналу. Он продолжает идти вверх и достигает главного издателя и испускает элемент.
  • Когда вы отменяете подписку, он снова направляется вверх снизу вверх. Похоже, это работает задом наперед.
  • Обратите внимание, что даже если вы вызываете утилизацию, издатель может продолжать отправлять элементы до тех пор, пока он не получит уведомление об отмене. Например: вы делаете некоторую операцию блокировки Thread.sleep(2000) в конвейере. В течение этого времени, если у вас был поток, он мог испустить несколько элементов в течение этого времени, так как он все еще не знает об отмене, поскольку он должен достичь снизу вверх.
  • Относительно поведения блокировки, Планировщики здесь просто используются для переключения контекста / использования некоторых выделенных пулов потоков для выполнения операции. Каждый оператор в цепочке будет выполнять операцию, только если предыдущая операция завершена.
...