Высокая пропускная способность пишет на переменную в Java 8 Concurrency? - PullRequest
0 голосов
/ 03 февраля 2019

Если у меня есть простое целое число в программе на Java 8, которое доступно для чтения и записи несколькими потоками.

Если мне сказали, что приложение должно поддерживать чтение с высокой пропускной способностью и очень мало записей - ответ наэто довольно просто, я просто использую блокировку чтения-записи.Тогда несколько потоков могут выполнять чтение одновременно без блокировки - и блокирование происходит только тогда, когда выполняется нечастая запись.

Но в случае, если мне сказали, что приложение должно поддерживать записи с высокой пропускной способностью (то есть переменная общего доступа)часто обновляется разными темами).Независимо от того, какую блокировку я здесь использую, насколько я вижу, она всегда будет приводить к блокировке потоков - в том случае, когда поток получает блокировку для переменной и обновляет ее, остальные потоки, которые также пытаются обновитьпеременная просто должна ждать, пока они не получат блокировку - это правильно или я что-то упустил в Java 8?

Я мог бы уйти и написать какой-то асинхронный метод обновления для общей переменной, гдепоток вызывает метод обновления, который он возвращает немедленно, и я использую некоторую структуру данных под прикрытием, чтобы поставить в очередь записи в общую переменную.По крайней мере, таким образом я бы предотвратил блокировку потоков при попытке обновить общую переменную.При условии, что этот подход вызовет другие проблемы, например, если поток предположит, что он гарантирует запись def.удалось или мне нужно перезвонить, чтобы сообщить об успешном обновлении и т. д. Кроме чего-то подобного, я не вижу способа обойти блокировку при использовании любой блокировки в Java 8 для записи с высокой пропускной способностью?(Или я должен просто принять блокировку и использовать блокировку в любом случае, даже в случае записи с высокой пропускной способностью).Спасибо

1 Ответ

0 голосов
/ 03 февраля 2019

Строго говоря, Integer - вы можете использовать LongAdder, его реализация, похоже, именно для вашего случая.Если вам не безразличны вот некоторые дополнительные детали.

Он использует CAS (сравните и поменяйте местами) под капотом, очень похоже на AtomicLong, но с некоторыми отличиями.Прежде всего, фактический long value, который он содержит, обернут в так называемый Cell - в основном это класс, который позволяет cas (сравнивать и менять) value на новое значение, во многом как установщик, если выхочу.Этот Cell также помечен @sun.misc.Contended для предотвращения ложного обмена;Вот объяснение этому (из комментариев к коду):

Но атомарные объекты, находящиеся в массивах, будут склонны размещаться рядом друг с другом, и поэтому чаще всего будут совместно использовать строки кэша (с огромнымиотрицательное влияние на производительность) без этой меры предосторожности.

Реализация очень интересна здесь.Давайте посмотрим, что происходит, когда вы вызываете метод add(long x):

 public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

Идея состоит в том, что если Cell [] cs равно нулю , то до этого не было споров, то есть long valueлибо не инициализирован, либо все предыдущие операции CAS выполнены всеми потоками.В этом случае попробуйте CAS новое значение до long value - если это сработало, все готово.Однако, если это не удалось, создается массив Cell [], так что каждый отдельный поток пытается работать в своем собственном пространстве, сводя к минимуму конфликты.

Следующее предложение - это то, что вас действительно волнует, если я вас понялвопрос правильно (и он мой, вообще не из комментариев к коду):

Проще говоря: если между потоками нет разногласий, работа выполняется так, как будто используется AtomicLong (вроде), в противном случае попробуйте создать отдельное пространство для каждого потока для работы.

Если вас интересуют некоторые дополнительные детали, которые мне показались интересными:

Cell[] всегда является степенью двойки (очень похоже на HashMap внутренний массив);затем каждый поток использует ThreadLocalRandom для создания некоторого хэш-кода, чтобы попытаться найти запись в массиве Cell [] cs для записи, или даже повторного хэширования, используя Marsaglia XorShif, чтобы попытаться найти свободный слот в этом массиве;размер массива ограничен количеством ядер, которые у вас есть (на самом деле, ближайшая степень от двух), размер этого массива можно изменить, чтобы он мог расти, и все эти операции выполняются с использованием спин-блокировки volatile int cellsBusy.Этот код превосходен, но, как я уже сказал, я не понимаю всего этого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...