Promise.cache Ratpack с несколькими последующими обещаниями в ParallelBatch - PullRequest
0 голосов
/ 12 июня 2018

Я сталкиваюсь с NullPointerException в кишечнике Ratpack при использовании Ratpack's Promise.cache в сочетании с многочисленными нисходящими обещаниями и ParallelBatch, и из документации мне не ясно,мое использование некорректно, или если это представляет ошибку в Ratpack.

Вот сокращенный тестовый пример, который демонстрирует проблему:

@Test
public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 25; i++) {
        Promise<Integer> p = Promise.value(12);
        p = p.cache();
        promises.add(p.map(v -> v + 1));
        promises.add(p.map(v -> v + 2));
    }

    final List<Integer> results = ExecHarness.yieldSingle(c ->
            ParallelBatch.of(promises).yield()
    ).getValueOrThrow();
}

Выполнение этого теста 10000 раз локально приводит к частоте отказовоколо 10/10000, с NullPointerException, который выглядит следующим образом:

java.lang.NullPointerException
    at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
    at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
    at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
    at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)

Если не использовать cache в этом тестовом примере, проблема исчезнет, ​​так как не подписывается на каждое кэшированное обещание дважды.

Мой вопрос: это неправильное использование API Ratpack, или это ошибка в фреймворке?Если первое, можете ли вы указать мне на что-то в документах, что объясняет, почему это использование неправильно?

1 Ответ

0 голосов
/ 14 июня 2018

Несмотря на то, что ваш пример не является лучшим вариантом использования для кэширования обещаний (повторное создание и обещание кэширования, которое содержит одно и то же значение для каждого шага итерации, не имеет особого смысла), вы на самом деле обнаружили ошибку условия гонки в CachingUpstream<T> класс.

Я провел несколько экспериментов, чтобы выяснить, что происходит, и вот мои выводы.Во-первых, я создал обещание со значением 12, которое обеспечивает пользовательскую (более детальную) реализацию CachingUpstream<T> объекта.Я взял тело Promise.value(12) и переопределил встроенный метод cacheResultIf(Predicate<? super ExecResult<T>> shouldCache), который по умолчанию возвращает CachingUpstream<T> экземпляр:

Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
        continuation.resume(() -> down.success(12))
)) {
    @Override
    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
        return transform(up -> {
            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
        });
    }
};

Далее я создал класс TestCachingUpstream<T> просто скопировав тело исходного класса, и я добавил несколько вещей, например

  • Я сделал каждые TestCachingUpstream<T> с внутренним идентификатором (случайным UUID), чтобы упростить отслеживание выполнения обещания.
  • Я добавил несколько подробных сообщений журнала, когда во время выполнения обещания произошли определенные события.

Я не изменил реализацию методов, я просто хотел отследить поток выполнения исохранить оригинальную реализацию как есть.Мой пользовательский класс выглядел так:

private static class TestCachingUpstream<T> implements Upstream<T> {
    private final String id = UUID.randomUUID().toString();

    private Upstream<? extends T> upstream;

    private final Clock clock;
    private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;

    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
        this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
        this.upstream = upstream;
        this.ttlFunc = ttl;
        this.clock = clock;
    }

    private void tryDrain() {
        if (draining.compareAndSet(false, true)) {
            try {
                TestCachingUpstream.Cached<? extends T> cached = ref.get();
                if (needsFetch(cached)) {
                    if (pending.compareAndSet(false, true)) {
                        Downstream<? super T> downstream = waiting.poll();

                        System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                        if (downstream == null) {
                            pending.set(false);
                        } else {
                            try {
                                yield(downstream);
                            } catch (Throwable e) {
                                System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                receiveResult(downstream, ExecResult.of(Result.error(e)));
                            }
                        }
                    }
                } else {
                    System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                    Downstream<? super T> downstream = waiting.poll();
                    while (downstream != null) {
                        downstream.accept(cached.result);
                        downstream = waiting.poll();
                    }
                }
            } finally {
                draining.set(false);
            }
        }

        if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
            tryDrain();
        }
    }

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
        return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
        System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
        upstream.connect(new Downstream<T>() {
            public void error(Throwable throwable) {
                System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
            }

            @Override
            public void success(T value) {
                System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                receiveResult(downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
        TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
        if (needsFetch(cached)) {
            Promise.<T>async(d -> {
                waiting.add(d);
                tryDrain();
            }).result(downstream::accept);
        } else {
            downstream.accept(cached.result);
        }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
        Duration ttl = Duration.ofSeconds(0);
        try {
            ttl = ttlFunc.apply(result);
        } catch (Throwable e) {
            if (result.isError()) {
                //noinspection ThrowableResultOfMethodCallIgnored
                result.getThrowable().addSuppressed(e);
            } else {
                result = ExecResult.of(Result.error(e));
            }
        }

        Instant expiresAt;
        if (ttl.isNegative()) {
            expiresAt = null; // eternal
            System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
            upstream = null; // release
        } else if (ttl.isZero()) {
            expiresAt = clock.instant().minus(Duration.ofSeconds(1));
        } else {
            expiresAt = clock.instant().plus(ttl);
        }

        ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
        pending.set(false);

        downstream.accept(result);

        tryDrain();
    }

    static class Cached<T> {
        final ExecResult<T> result;
        final Instant expireAt;

        Cached(ExecResult<T> result, Instant expireAt) {
            this.result = result;
            this.expireAt = expireAt;
        }
    }
}

Я сократил количество шагов в цикле for с 25 до 3, чтобы вывод консоли был более лаконичным.

Успешное выполнение теста (без условия гонки)

Давайте посмотрим, как выглядит поток правильного выполнения:

[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...  
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...  
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...  
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...

Как вы можете видеть каждую итерациювызывает кешированное обещание выдать 5 консольных логов.

Иэтот сценарий повторяется для всех трех обещаний, созданных внутри цикла for.

Неудачное выполнение теста (условие гонки)

Запуск теста с этими System.out.printf() тестом проваливался в несколько раз реже, чаще всегопотому что это опера ввода / выводаЭта операция потребляет некоторые циклы ЦП, а десинхронизированная часть кода имеет еще несколько циклов, чтобы избежать состояния гонки.Однако это все еще происходит, и теперь давайте посмотрим, как выглядит результат неудачного теста:

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...  
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null... 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...  
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...  
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null}) 

java.lang.NullPointerException
    at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
    at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
    at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Это результат неудачного теста - я запустил его в IntelliJ IDEA и настроил выполнение этоготест повторить до отказа.Мне потребовалось некоторое время, чтобы этот тест не удался, но после того, как он несколько раз выполнялся, он, наконец, не прошел итерацию с номером 1500. В этом случае мы можем видеть, что условие гонки произошло с первым обещанием, созданным в цикле for.Вы можете видеть, что после освобождения вышестоящего объекта внутри receiveResult() метода

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 

и вызова tryDrain перед выходом из метода, следующее выполнение кэшированного обещания еще не увидело ранее кэшированного результата и дошло дометод yield(downstream) снова.После upstream объект уже был освобожден, установив его значение в null.И метод yield(downstream) ожидает, что вышестоящий объект инициализирован правильно, в противном случае он выбрасывает NPE.

Я пытался отладить метод:

private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}

Этот метод решает, нужно ли извлекать кэшированное обещание.Однако, когда я добавил какие-либо операторы регистрации, это начало вызывать StackOverflowError.Я предполагаю, что в редких случаях cached.expireAt.isBefore(clock.instant()) возвращает false, потому что cached объект происходит от AtomicReference, поэтому этот объект должен быть правильно передан между выполнением методов.

А вот и полный тестовый классЯ использовал в своих экспериментах:

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class AnotherPromiseTest {

    @Test
    public void foo() throws Exception {
        List<Promise<Integer>> promises = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
                    continuation.resume(() -> down.success(12))
            )) {
                @Override
                public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
                    return transform(up -> {
                        return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
                    });
                }
            };

            p = p.cache();
            promises.add(p.map(v -> v + 1));
            promises.add(p.map(v -> v + 2));
        }

        ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
    }

    private static class TestCachingUpstream<T> implements Upstream<T> {
        private final String id = UUID.randomUUID().toString();

        private Upstream<? extends T> upstream;

        private final Clock clock;
        private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
        private final Function<? super ExecResult<T>, Duration> ttlFunc;

        private final AtomicBoolean pending = new AtomicBoolean();
        private final AtomicBoolean draining = new AtomicBoolean();
        private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

        public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
            this(upstream, ttl, Clock.systemUTC());
        }

        @VisibleForTesting
        TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
            this.upstream = upstream;
            this.ttlFunc = ttl;
            this.clock = clock;
        }

        private void tryDrain() {
            if (draining.compareAndSet(false, true)) {
                try {
                    TestCachingUpstream.Cached<? extends T> cached = ref.get();
                    if (needsFetch(cached)) {
                        if (pending.compareAndSet(false, true)) {
                            Downstream<? super T> downstream = waiting.poll();

                            System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                            if (downstream == null) {
                                pending.set(false);
                            } else {
                                try {
                                    yield(downstream);
                                } catch (Throwable e) {
                                    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                    receiveResult(downstream, ExecResult.of(Result.error(e)));
                                }
                            }
                        }
                    } else {
                        System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                        Downstream<? super T> downstream = waiting.poll();
                        while (downstream != null) {
                            downstream.accept(cached.result);
                            downstream = waiting.poll();
                        }
                    }
                } finally {
                    draining.set(false);
                }
            }

            if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                tryDrain();
            }
        }

        private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
            return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
        }

        private void yield(final Downstream<? super T> downstream) throws Exception {
            System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
            upstream.connect(new Downstream<T>() {
                public void error(Throwable throwable) {
                    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                }

                @Override
                public void success(T value) {
                    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.success(value)));
                }

                @Override
                public void complete() {
                    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, CompleteExecResult.get());
                }
            });
        }

        @Override
        public void connect(Downstream<? super T> downstream) throws Exception {
            TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
            if (needsFetch(cached)) {
                Promise.<T>async(d -> {
                    waiting.add(d);
                    tryDrain();
                }).result(downstream::accept);
            } else {
                downstream.accept(cached.result);
            }
        }

        private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
            Duration ttl = Duration.ofSeconds(0);
            try {
                ttl = ttlFunc.apply(result);
            } catch (Throwable e) {
                if (result.isError()) {
                    //noinspection ThrowableResultOfMethodCallIgnored
                    result.getThrowable().addSuppressed(e);
                } else {
                    result = ExecResult.of(Result.error(e));
                }
            }

            Instant expiresAt;
            if (ttl.isNegative()) {
                expiresAt = null; // eternal
                System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                upstream = null; // release
            } else if (ttl.isZero()) {
                expiresAt = clock.instant().minus(Duration.ofSeconds(1));
            } else {
                expiresAt = clock.instant().plus(ttl);
            }

            ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
            pending.set(false);

            downstream.accept(result);

            tryDrain();
        }

        static class Cached<T> {
            final ExecResult<T> result;
            final Instant expireAt;

            Cached(ExecResult<T> result, Instant expireAt) {
                this.result = result;
                this.expireAt = expireAt;
            }
        }
    }
}

Надеюсь, это поможет.

...