RX Java-буфер теряет элементы - PullRequest
0 голосов
/ 27 октября 2018

У меня есть PublishSubject с этой конфигурацией:

PublishSubject<Message> messageObserver = 
    messageObserver
    .filter(t -> test(t))
    .buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
    .subscribe(messages -> saveToDB(messages));

Различные темы моего приложения пишут сообщения на этот PublishSubject через onNext().

Как я вижу, buffer, лежащий в основе ObservableBufferTimed.BufferExactBoundedObserver, не является потокобезопасным, поскольку его onNext выглядит следующим образом:

public void onNext(T t) {
            U b;
            synchronized (this) {
                b = buffer;
                if (b == null) {
                    return;
                }

                b.add(t);

                if (b.size() < maxSize) {
                    return;
                }
                buffer = null;
                producerIndex++;
            }

            if (restartTimerOnMaxSize) {
                timer.dispose();
            }

            fastPathOrderedEmit(b, false, this);

            try {
                b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                actual.onError(e);
                dispose();
                return;
            }

            synchronized (this) {
                buffer = b;
                consumerIndex++;
            }
            if (restartTimerOnMaxSize) {
                timer = w.schedulePeriodically(this, timespan, timespan, unit);
            }
        }

Чтобы сделать случай состояния гонки более очевидным, я установил для параметров eventsSaveTimeSpanInSeconds и eventsSaveCount значение 1 (1 событие в 1 секунду).

Проблема появляется в этом блоке:

synchronized (this) {
                b = buffer;
                if (b == null) {
                    return;
                }

                b.add(t);

                if (b.size() < maxSize) {
                    return;
                }
                buffer = null;
                producerIndex++;
            }

Итак, если два сообщения буферизуются одновременно, то первое сообщение заполняет buffer и присваивает нулевую переменную буфера. Новый буфер будет инициализирован позже после синхронизированного блока. Если есть условие гонки, когда buffer равно нулю, второе сообщение не буферизуется из-за кода:

if (b == null) {
  return;
}

Это дефект или некорректное поведение буфера? Как я могу избежать этой ситуации?

1 Ответ

0 голосов
/ 04 ноября 2018

Используйте сериализованную тему, если несколько потоков хотят вызвать onNext:

Subject<Message> messageObserver = PublishSubject.<Message>create().toSerialized();

messageObserver
.filter(t -> test(t))
.buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
.subscribe(messages -> saveToDB(messages));

// from any thread now
messageObserver.onNext(message);
...