У меня есть PublishSubject с этой конфигурацией:
PublishSubject<Message> messageObserver =
messageObserver
.filter(t -> test(t))
.buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
.subscribe(messages -> saveToDB(messages));
Различные темы моего приложения пишут сообщения на этот PublishSubject
через onNext()
.
Как я вижу, buffer
, лежащий в основе ObservableBufferTimed.BufferExactBoundedObserver
, не является потокобезопасным, поскольку его onNext выглядит следующим образом:
public void onNext(T t) {
U b;
synchronized (this) {
b = buffer;
if (b == null) {
return;
}
b.add(t);
if (b.size() < maxSize) {
return;
}
buffer = null;
producerIndex++;
}
if (restartTimerOnMaxSize) {
timer.dispose();
}
fastPathOrderedEmit(b, false, this);
try {
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
dispose();
return;
}
synchronized (this) {
buffer = b;
consumerIndex++;
}
if (restartTimerOnMaxSize) {
timer = w.schedulePeriodically(this, timespan, timespan, unit);
}
}
Чтобы сделать случай состояния гонки более очевидным, я установил для параметров eventsSaveTimeSpanInSeconds
и eventsSaveCount
значение 1 (1 событие в 1 секунду).
Проблема появляется в этом блоке:
synchronized (this) {
b = buffer;
if (b == null) {
return;
}
b.add(t);
if (b.size() < maxSize) {
return;
}
buffer = null;
producerIndex++;
}
Итак, если два сообщения буферизуются одновременно, то первое сообщение заполняет buffer
и присваивает нулевую переменную буфера. Новый буфер будет инициализирован позже после синхронизированного блока. Если есть условие гонки, когда buffer
равно нулю, второе сообщение не буферизуется из-за кода:
if (b == null) {
return;
}
Это дефект или некорректное поведение буфера? Как я могу избежать этой ситуации?