Project Reactor и модель памяти Java - PullRequest
0 голосов
/ 01 ноября 2018

Я пытаюсь понять, какие гарантии в отношении видимости данных проектный реактор предоставляет коду приложения. Например, Я ожидаю, что приведенный ниже код даст сбой, но это не произойдет после миллиона итераций. Я изменяю состояние типичного POJO в потоке A и считываю его обратно из потока B. Гарантирует ли Reactor, что изменения POJO видны в потоке?

public class Main {
    public static void main(String[] args) {
        Integer result = Flux.range(1, 1_000_000)
                .map(i -> {
                    Data data = new Data();
                    data.setValue(i);
                    data.setValueThreeTimes(i);
                    data.setValueObj(i + i);
                    return data;
                })
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                .sequential()
                .parallel(250)
                .runOn(Schedulers.newParallel("par", 500))
                .map(d -> {
                    d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
                    return d;
                })
                //                .sequential()
                .map(d -> {
                    if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
                    return d;
                })
                .reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
                .sequential()
                .blockLast();
    }

    static class Data {
        private int value;
        private int valueThreeTimes;
        private Integer valueObj;

        public int getValueThreeTimes() {
            return valueThreeTimes;
        }

        public void setValueThreeTimes(int valueThreeTimes) {
            this.valueThreeTimes = valueThreeTimes;
        }

        public int getValue() {
            return value;
        }

        @Override
        public String toString() {
            return "Data{" +
                    "value=" + value +
                    ", valueObj=" + valueObj +
                    '}';
        }

        public void setValue(int value) {
            this.value = value;
        }

        public Integer getValueObj() {
            return valueObj;
        }

        public void setValueObj(Integer valueObj) {
            this.valueObj = valueObj;
        }
    }

    private static <T> T identityWithThreadLogging(T el, String operation) {
        System.out.println(operation + " -- " + el + " -- " +
                Thread.currentThread().getName());
        return el;
    }
}

1 Ответ

0 голосов
/ 05 ноября 2018

Спецификация Reactive Streams предусматривает, что для Flux или Mono (a Publisher) события onNext должны быть последовательными.

parallel() - это ParallelFlux, который немного смягчает это путем деления и завоевания: вы получаете несколько «рельсов», каждый из которых индивидуально придерживается спецификации, но в целом этого не происходит (распараллеливание между рельсами).

В свою очередь, sequential() возвращается в мир Flux и вводит барьер памяти, чтобы гарантировать, что результирующая последовательность соответствует спецификации RS.

...