Замените синхронизированный цикл atomic + while в случае низкой конкуренции за блокировку - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть две функции, которые должны выполняться в критической секции:

public synchronized void f1() { ... }
public synchronized void f2() { ... }

Предположим, что поведение выглядит следующим образом:

  • f1 почти никогда не вызывается.На самом деле, в нормальных условиях этот метод никогда не вызывается.Если в любом случае вызывается f1, он должен быстро вернуться.
  • f2 вызывается с очень высокой скоростью.Он возвращается очень быстро.
  • Эти методы никогда не вызывают друг друга, и нет также и повторного входа.

Другими словами, очень низкий уровень разногласий.Поэтому, когда вызывается f2, у нас есть некоторые накладные расходы для получения блокировки, которая предоставляется сразу в 99,9% случаев.Мне интересно, есть ли подходы, чтобы избежать этих издержек.

Я предложил следующую альтернативу:

private final AtomicInteger lock = new AtomicInteger(0);

public void f1() {
    while (!lock.compareAndSet(0, 1)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

public void f2() {
    while (!lock.compareAndSet(0, 2)) {}

    try {
        ...
    } finally {
        lock.set(0);
    }
}

Существуют ли другие подходы?Пакет java.util.concurrent предлагает что-то изначально?

обновление

Хотя я собираюсь задать общий вопрос, некоторая информация о моей ситуации:

f1: этот метод создает новый удаленный поток, если по какой-то причине текущий становится поврежденным, например, из-за тайм-аута.Удаленный поток может рассматриваться как соединение сокетов, которое использует удаленную очередь, начиная с заданного местоположения:

private Stream stream;

public synchronized void f1() {
     final Stream stream = new Stream(...);

     if (this.stream != null) {
         stream.setPosition(this.stream.getPosition());
     }
     this.stream = stream;
     return stream;
}

f2: этот метод увеличивает позицию потока.Это простой установщик:

public synchronized void f2(Long p) {
    stream.setPosition(p);
}

Здесь stream.setPosition(Long) также реализован как простой установщик:

public class Stream {
    private volatile Long position = 0;

    public void setPosition(Long position) {
        this.position = position;
    }
}

В Stream текущая позиция будет отправлена ​​насервер периодически асинхронно.Обратите внимание, что Stream не реализован мной самостоятельно.

Моя идея состояла в том, чтобы ввести сравнение и обмен, как показано выше, и пометить stream как volatile.

Ответы [ 3 ]

0 голосов
/ 07 ноября 2018

Другой подход заключается в использовании временной отметки, которая работает как счетчик изменений. Это хорошо работает, если у вас высокое отношение чтения к записи.

Другой подход состоит в том, чтобы иметь неизменный объект, который хранит состояние через AtomicReference. Это хорошо работает, если у вас очень высокое отношение чтения к записи.

0 голосов
/ 08 ноября 2018

Из описания и кода вашего примера я сделал вывод следующее:

  1. Stream имеет свою внутреннюю позицию, и вы также отслеживаете самые последние position извне. Вы используете это как своего рода «точку возобновления»: когда вам нужно повторно инициализировать поток, вы продвигаете его до этой точки.

  2. Последний известный position может быть устаревшим; Я предполагаю это, основываясь на вашем утверждении, что поток периодически асинхронно уведомляет сервер о его текущей позиции.

  3. В момент вызова f1 известно, что поток находится в плохом состоянии.

  4. Функции f1 и f2 имеют доступ к одним и тем же данным и могут работать одновременно. Однако, ни f1, ни f2 не будут одновременно работать против самого себя . Другими словами, у вас почти есть однопоточная программа, за исключением редких случаев, когда выполняются и f1, и f2.

    [ Примечание: Мое решение на самом деле не волнует, если f1 вызывается одновременно с самим собой; важно только то, что f2 не вызывается одновременно с самим собой]

Если что-то из этого не так, то решение, приведенное ниже, неверно. Черт возьми, это может быть неправильно в любом случае, либо из-за некоторых упущенных деталей, либо из-за того, что я допустил ошибку. Написание кода с низкой блокировкой сложно , и именно поэтому вам следует избегать его, если вы не заметили фактическую проблему производительности .

static class Stream {
    private long position = 0L;

    void setPosition(long position) {
        this.position = position;
    }
}

final static class StreamInfo {
    final Stream stream = new Stream();
    volatile long resumePosition = -1;

    final void setPosition(final long position) {
        stream.setPosition(position);
        resumePosition = position;
    }
}

private final Object updateLock = new Object();
private final AtomicReference<StreamInfo> currentInfo = new AtomicReference<>(new StreamInfo());

void f1() {
    synchronized (updateLock) {
        final StreamInfo oldInfo = currentInfo.getAndSet(null);
        final StreamInfo newInfo = new StreamInfo();

        if (oldInfo != null && oldInfo.resumePosition > 0L) {
            newInfo.setPosition(oldInfo.resumePosition);
        }

        // Only `f2` can modify `currentInfo`, so update it last.
        currentInfo.set(newInfo);

        // The `f2` thread might be waiting for us, so wake them up.
        updateLock.notifyAll();
    }
}

void f2(final long newPosition) {
    while (true) {
        final StreamInfo s = acquireStream();

        s.setPosition(newPosition);
        s.resumePosition = newPosition;

        // Make sure the stream wasn't replaced while we worked.
        // If it was, run again with the new stream.
        if (acquireStream() == s) {
            break;
        }
    }
}

private StreamInfo acquireStream() {
    // Optimistic concurrency: hope we get a stream that's ready to go.
    // If we fail, branch off into a slower code path that waits for it.
    final StreamInfo s = currentInfo.get();
    return s != null ? s : acquireStreamSlow();
}

private StreamInfo acquireStreamSlow() {
    synchronized (updateLock) {
        while (true) {
            final StreamInfo s = currentInfo.get();

            if (s != null) {
                return s;
            }

            try {
                updateLock.wait();
            }
            catch (final InterruptedException ignored) {
            }
        }
    }
}

Если поток вышел из строя и заменяется на f1, возможно, что более ранний вызов f2 все еще выполнял некоторые операции с потоком (ныне несуществующим). Я предполагаю, что это нормально, и что он не будет вызывать нежелательных побочных эффектов (кроме тех, которые уже присутствуют в вашей версии на основе блокировки). Я делаю это предположение, потому что мы уже установили в приведенном выше списке, что ваша точка возобновления может быть устаревшей, и мы также установили, что f1 вызывается только тогда, когда известно, что поток находится в плохом состоянии.

Исходя из моих тестов JMH, этот подход примерно в 3 раза быстрее, чем CAS или синхронизированные версии (которые довольно близки).

0 голосов
/ 07 ноября 2018

Ваш пример не делает то, что вы хотите. Вы фактически выполняете свой код, когда используется блокировка . Попробуйте что-то вроде этого:

public void f1() {
    while (!lock.compareAndSet(0, 1)) {
    }

    try {
        ...
    } finally {
        lock.set(0);
    }
}

Чтобы ответить на ваш вопрос, я не верю, что это будет быстрее, чем использование synchronized методов, и этот метод труднее читать и понимать.

...