Как остановить дорогой расчет в Spring Flux - PullRequest
0 голосов
/ 15 февраля 2020

Я использую Spring реактив в качестве сервера для создания дорогостоящего поколения и возврата результатов в Flux один за другим. Преимущество этого заключается в том, что генерация прекращается, если запрос отменяется (например, из-за ограничений и слишком жестких ограничений). Мой код выглядит следующим образом:

    public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
        return Flux.range(0, nbrOfEntitiesToGenerate)
            .map(x -> Generator.expensiveGeneration(constraints)
//            .subscribeOn(Schedulers.parallel())
            ;
    }

Это делает только половину того, что я хочу, я не делаю следующий вызов expensiveGeneration при отмене, но не прекращает запуск дорогого поколения, которое может никогда не произойти fini sh, если ограничения слишком жесткие. Как я могу это сделать, пожалуйста.

Дополнительный вопрос, если вы знаете, как я могу сгенерировать x сущностей в parralel, чтобы максимизировать использование моих потоков (конечно, без запуска ВСЕХ поколений сразу).

Заранее спасибо.

1 Ответ

0 голосов
/ 12 марта 2020

Создать Scheduler из ExecutorService просто, но вам нужно сохранить вызываемый Future<?>, если вы хотите отменить. Я изменил Generator, чтобы сохранить его и обернуть метод cancel, который вызывается, когда Flux обрабатывает doOnCancel.

public class FluxPlay {
    public static void main(String[] args) {
        new FluxPlay().run();
    }
    private void run() {
        Flux<LocalDateTime> f = generate(10);
        Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
        try {
            Thread.sleep(4500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        d.dispose();
    }

    private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
        Generator generator = new Generator();
        return Flux.range(0, nbrOfEntitiesToGenerate)
        .map(x -> generator.expensiveGeneration())
        .doOnCancel(generator::cancel)
        .doFinally(generator::shutdown)
        .publishOn(Schedulers.fromExecutor(generator::submit));
    }
}

и:

public class Generator {
    Future<?> f;
    ExecutorService es = Executors.newSingleThreadExecutor();
    public void submit(Runnable command) {
      f = es.submit(command);
    }
    public void cancel() {
        f.cancel(true);
    }
    public void shutdown(SignalType st) {
        es.shutdown();
    }
    public LocalDateTime expensiveGeneration() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        return LocalDateTime.now();
    }
}
...