Как кешировать Single с истечением времени - PullRequest
1 голос
/ 26 марта 2019

У меня большой код вычисления:

static class Computer {
    public Single<String> compute() {
        return Single.fromCallable(() -> {
            Thread.sleep(1000);
            System.out.println("Computation...");
            return "3.14";
        });
    }
}

Я хочу оценить это один раз для нескольких подписчиков, то есть кеша.

Computer computer = new Computer();
Single<String> c = computer.compute().cache();
c.blockingGet();
c.blockingGet();

Но я хочу, чтобы срок действия кеша истекал через определенный периодвремени:

Thread.sleep(2000);
//This should trigger computation
c.blockingGet();
c.blockingGet();

Есть идеи?

1 Ответ

0 голосов
/ 28 марта 2019

После некоторых попыток я не смог создать эту настройку со встроенными Observable s. Я думаю, что это возможно, но IMO вам понадобится Subject для этой цели, что, я думаю, излишне для этой конкретной установки.

Решение, которое я нашел, состоит в том, чтобы создать для этой цели специальный оператор, который самостоятельно выполняет кэшированный тайм-аут для данной функции.

Класс будет выглядеть примерно так:

class TimeoutSingleCache<Downstream, Upstream> implements SingleOperator<Downstream, Upstream> {

    private final Object LOCK = new Object();

    // Cache
    private long prevCallTime = -1;
    @Nullable
    private volatile Downstream cachedValue = null;

    // Inputs
    private final long invalidationTimeoutMillis;
    private final Function<Upstream, Downstream> operation;

    public TimeoutSingleCache(
            long invalidationTimeoutMillis,
            Function<Upstream, Downstream> operation
    ) {
        this.invalidationTimeoutMillis = invalidationTimeoutMillis;
        this.operation = operation;
    }

    @Override
    public SingleObserver<? super Upstream> apply(SingleObserver<? super Downstream> observer) throws Exception {
        return new SingleObserver<Upstream>() {

            @Override
            public void onSubscribe(Disposable d) {
                observer.onSubscribe(d);
            }

            @Override
            public void onSuccess(Upstream upstream) {
                long currentTime = System.currentTimeMillis();

                // Could be faster with double-check lock - not the point here
                synchronized (LOCK) {
                    if (cachedValue == null || prevCallTime + invalidationTimeoutMillis < currentTime) {
                        prevCallTime = currentTime;
                        try {
                            cachedValue = operation.apply(upstream);
                        } catch (Exception e) {
                            observer.onError(e);
                        }
                    }
                }

                observer.onSuccess(cachedValue);
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }
        };
    }
}

Использование:

public static void main(String[] args) throws InterruptedException {
    Single<String> cachedComputation = Single.just(1)
            .lift(new TimeoutSingleCache<>(100L, input -> {
                System.out.println("Computation!");
                return "" + input + " : 3.14";
            }));

    System.out.println("1");
    System.out.println(cachedComputation.blockingGet());
    System.out.println("2");
    System.out.println(cachedComputation.blockingGet());
    Thread.sleep(200L);
    System.out.println("3");
    System.out.println(cachedComputation.blockingGet());
    System.out.println("4");
    System.out.println(cachedComputation.blockingGet());
}

Выход:

1
Computation!
1 : 3.14
2
1 : 3.14
3
Computation!
1 : 3.14
4
1 : 3.14

Надеюсь, это помогло. Большой вопрос между прочим.

...