что не так с этим потокобезопасным генератором последовательности байтов? - PullRequest
6 голосов
/ 12 сентября 2011

Мне нужен генератор байтов, который бы генерировал значения из Byte.MIN_VALUE в Byte.MAX_VALUE.Когда он достигает MAX_VALUE, он должен начинаться заново с MIN_VALUE.

Я написал код, используя AtomicInteger (см. Ниже);тем не менее, код не работает должным образом, если к нему обращаются одновременно и если он сделан искусственно медленным с Thread.sleep () (если нет спящего, он работает нормально; однако, я подозреваю, что он слишком быстр для появления проблем параллелизма).

Код (с некоторым добавленным отладочным кодом):

public class ByteGenerator {

    private static final int INITIAL_VALUE = Byte.MIN_VALUE-1;

    private AtomicInteger counter = new AtomicInteger(INITIAL_VALUE);
    private AtomicInteger resetCounter = new AtomicInteger(0);

    private boolean isSlow = false;
    private long startTime;

    public byte nextValue() {
        int next = counter.incrementAndGet();
        //if (isSlow) slowDown(5);
        if (next > Byte.MAX_VALUE) {
            synchronized(counter) {
                int i = counter.get();
                //if value is still larger than max byte value, we reset it
                if (i > Byte.MAX_VALUE) {
                    counter.set(INITIAL_VALUE);
                    resetCounter.incrementAndGet();
                    if (isSlow) slowDownAndLog(10, "resetting");
                } else {
                    if (isSlow) slowDownAndLog(1, "missed");
                }
                next = counter.incrementAndGet();
            }
        }
        return (byte) next;
    }

    private void slowDown(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
        }
    }
    private void slowDownAndLog(long millis, String msg) {
        slowDown(millis);
        System.out.println(resetCounter + " " 
                           + (System.currentTimeMillis()-startTime) + " "
                           + Thread.currentThread().getName() + ": " + msg);
    }

    public void setSlow(boolean isSlow) {
        this.isSlow = isSlow;
    }
    public void setStartTime(long startTime) {
        this.startTime = startTime;
    }

}

И тест:

public class ByteGeneratorTest {

    @Test
    public void testGenerate() throws Exception {
        ByteGenerator g = new ByteGenerator();
        for (int n = 0; n < 10; n++) {
            for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) {
                assertEquals(i, g.nextValue());
            }
        }
    }

    @Test
    public void testGenerateMultiThreaded() throws Exception {
        final ByteGenerator g = new ByteGenerator();
        g.setSlow(true);
        final AtomicInteger[] counters = new AtomicInteger[Byte.MAX_VALUE-Byte.MIN_VALUE+1];
        for (int i = 0; i < counters.length; i++) {
            counters[i] = new AtomicInteger(0);
        }
        Thread[] threads = new Thread[100];
        final CountDownLatch latch = new CountDownLatch(threads.length);
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new Runnable() {
                public void run() {
                    try {
                        for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) {
                            byte value = g.nextValue();
                            counters[value-Byte.MIN_VALUE].incrementAndGet();
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            }, "generator-client-" + i);
            threads[i].setDaemon(true);
        }
        g.setStartTime(System.currentTimeMillis());
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        latch.await();
        for (int i = 0; i < counters.length; i++) {
            System.out.println("value #" + (i+Byte.MIN_VALUE) + ": " + counters[i].get());
        }
        //print out the number of hits for each value
        for (int i = 0; i < counters.length; i++) {
            assertEquals("value #" + (i+Byte.MIN_VALUE), threads.length, counters[i].get());
        }
    }

}

Результат на моем 2-ядерном компьютере - это значение# -128 получает 146 хитов (все они должны получить 100 хитов в равной степени, так как у нас 100 потоков).

Если у кого-то есть идеи, что не так с этим кодом, я весь в ушах / глазах.

ОБНОВЛЕНИЕ: для тех, кто спешит и не хочет прокрутить вниз, правильный (и самый короткий и самый элегантный) способ решить эту проблему в Java будет выглядеть так:

public byte nextValue() {
   return (byte) counter.incrementAndGet();
}

Спасибо, Хайнц!

Ответы [ 5 ]

9 голосов
/ 12 сентября 2011

Изначально Java хранила все поля как 4 или 8-байтовые значения, даже короткие и байтовые. Операции с полями просто делали бы битовую маскировку, чтобы уменьшить байты. Таким образом, мы могли бы очень легко сделать это:

public byte nextValue() {
   return (byte) counter.incrementAndGet();
}

Забавная маленькая головоломка, спасибо Neeme: -)

5 голосов
/ 12 сентября 2011

Вы принимаете решение incrementAndGet (), основываясь на старом значении counter.get (). Значение счетчика может снова достигнуть MAX_VALUE перед выполнением операции incrementAndGet () над счетчиком.

if (next > Byte.MAX_VALUE) {
    synchronized(counter) {
        int i = counter.get(); //here You make sure the the counter is not over the MAX_VALUE
        if (i > Byte.MAX_VALUE) {
            counter.set(INITIAL_VALUE);
            resetCounter.incrementAndGet();
            if (isSlow) slowDownAndLog(10, "resetting");
        } else {
            if (isSlow) slowDownAndLog(1, "missed"); //the counter can reach MAX_VALUE again if you wait here long enough
        }
        next = counter.incrementAndGet(); //here you increment on return the counter that can reach >MAX_VALUE in the meantime
    }
}

Чтобы это работало, нужно убедиться, что по устаревшей информации не принято никаких решений. Сбросьте счетчик или верните старое значение.

public byte nextValue() {
    int next = counter.incrementAndGet();

    if (next > Byte.MAX_VALUE) {
        synchronized(counter) {
            next = counter.incrementAndGet();
            //if value is still larger than max byte value, we reset it
            if (next > Byte.MAX_VALUE) {
                counter.set(INITIAL_VALUE + 1);
                next = INITIAL_VALUE + 1;
                resetCounter.incrementAndGet();
                if (isSlow) slowDownAndLog(10, "resetting");
            } else {
                if (isSlow) slowDownAndLog(1, "missed");
            }
        }
    }

    return (byte) next;
}
3 голосов
/ 12 сентября 2011

Ваш синхронизированный блок содержит только тело if. Он должен обернуть весь метод, включая if сам оператор. Или просто сделайте ваш метод nextValue синхронизированным. Кстати, в этом случае вам не нужны атомарные переменные вообще.

Я надеюсь, что это сработает для вас. Попробуйте использовать атомарные переменные, только если вам действительно нужен код с самой высокой производительностью, то есть оператор synchronized беспокоит вас. ИМХО в большинстве случаев это не так.

2 голосов
/ 12 сентября 2011

Если я правильно вас понимаю, вас волнует, что результаты nextValue находятся в диапазоне Byte.MIN_VALUE и Byte.MAX_VALUE, и вас не волнует значение, сохраненное в счетчике. Затем вы можете отобразить целые числа в байтах так, чтобы вам было показано требуемое поведение перечисления:

private static final int VALUE_RANGE = Byte.MAX_VALUE - Byte.MIN_VALUE + 1;
private final AtomicInteger counter = new AtomicInteger(0);

public byte nextValue() {
   return (byte) (counter.incrementAndGet() % VALUE_RANGE + Byte.MIN_VALUE - 1);
}

Осторожно, это непроверенный код. Но идея должна работать.

1 голос
/ 12 сентября 2011

Я кодировал следующую версию nextValue, используя compareAndSet, которая предназначена для использования в несинхронизированном блоке.Он прошел ваши юнит-тесты:

О, и я ввел новые константы для MIN_VALUE и MAX_VALUE, но вы можете игнорировать их, если хотите.

static final int LOWEST_VALUE = Byte.MIN_VALUE;
static final int HIGHEST_VALUE = Byte.MAX_VALUE;

private AtomicInteger counter = new AtomicInteger(LOWEST_VALUE - 1);
private AtomicInteger resetCounter = new AtomicInteger(0);

public byte nextValue() {
    int oldValue; 
    int newValue; 

    do {
        oldValue = counter.get();
        if (oldValue >= HIGHEST_VALUE) {
            newValue = LOWEST_VALUE;
            resetCounter.incrementAndGet();
            if (isSlow) slowDownAndLog(10, "resetting");
        } else {
            newValue = oldValue + 1;    
            if (isSlow) slowDownAndLog(1, "missed");
        }
    } while (!counter.compareAndSet(oldValue, newValue));
    return (byte) newValue;
}

compareAndSet() работает в сочетании с get() для управления параллелизмом.

В начале критической секции вы выполняете get(), чтобы получить старое значение.Затем вы выполняете некоторую функцию, зависящую только от старого значения, чтобы вычислить новое значение.Затем вы используете compareAndSet(), чтобы установить новое значение.Если AtomicInteger больше не равен старому значению во время выполнения compareAndSet() (из-за одновременной активности), он завершается неудачно, и вы должны начать заново.

Если у вас экстремальное количество параллелизма иВремя вычислений велико, вполне возможно, что compareAndSet() может много раз потерпеть неудачу, прежде чем преуспеет, и, возможно, стоит собрать статистику по этому вопросу, если вас это касается.

Я не предполагаю, что это лучше или хужеподход, чем простой синхронизированный блок, как предлагали другие, но я лично, вероятно, для простоты использовал бы синхронизированный блок.

РЕДАКТИРОВАТЬ : я отвечу на ваш фактический вопрос "Почему не мойработа? "

Ваш код имеет:

    int next = counter.incrementAndGet();
    if (next > Byte.MAX_VALUE) {

Поскольку эти две строки не защищены синхронизированным блоком, несколько потоков могут выполнять их одновременно, и все получают значения next> Byte.MAX_VALUE.Все они затем перейдут в синхронизированный блок и установят counter обратно на INITIAL_VALUE (один за другим, пока они ждут друг друга).

За прошедшие годы было написано огромное количествоиз-за ловушек попыток настроить производительность, не синхронизируя, когда это не кажется необходимым.Например, см. Двойная проверка блокировки

...