Я пытаюсь понять, какие гарантии в отношении видимости данных проектный реактор предоставляет коду приложения. Например, Я ожидаю, что приведенный ниже код даст сбой, но это не произойдет после миллиона итераций. Я изменяю состояние типичного POJO в потоке A и считываю его обратно из потока B. Гарантирует ли Reactor, что изменения POJO видны в потоке?
public class Main {
public static void main(String[] args) {
Integer result = Flux.range(1, 1_000_000)
.map(i -> {
Data data = new Data();
data.setValue(i);
data.setValueThreeTimes(i);
data.setValueObj(i + i);
return data;
})
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
.sequential()
.parallel(250)
.runOn(Schedulers.newParallel("par", 500))
.map(d -> {
d.setValueThreeTimes(d.getValueThreeTimes() + d.getValue());
return d;
})
// .sequential()
.map(d -> {
if (d.getValue() * 3 != d.getValueThreeTimes()) throw new RuntimeException("data corrupt error");
return d;
})
.reduce(() -> 0, (Integer sum, Data d) -> sum + d.getValueObj() + d.getValue())
.sequential()
.blockLast();
}
static class Data {
private int value;
private int valueThreeTimes;
private Integer valueObj;
public int getValueThreeTimes() {
return valueThreeTimes;
}
public void setValueThreeTimes(int valueThreeTimes) {
this.valueThreeTimes = valueThreeTimes;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return "Data{" +
"value=" + value +
", valueObj=" + valueObj +
'}';
}
public void setValue(int value) {
this.value = value;
}
public Integer getValueObj() {
return valueObj;
}
public void setValueObj(Integer valueObj) {
this.valueObj = valueObj;
}
}
private static <T> T identityWithThreadLogging(T el, String operation) {
System.out.println(operation + " -- " + el + " -- " +
Thread.currentThread().getName());
return el;
}
}