Параллелизм Java - улучшение коллекции копирования при чтении - PullRequest
2 голосов
/ 13 декабря 2011

У меня есть многопоточное приложение, в котором общий список работает с частотой записи и чтения.

В частности, многие потоки будут выгружать данные в список, а затем - позже - другой работник получитснимок для сохранения в хранилище данных.

Это похоже на обсуждение по этому вопросу .

Там предоставляется следующее решение:

class CopyOnReadList<T> {

    private final List<T> items = new ArrayList<T>();

    public void add(T item) {
        synchronized (items) {
            // Add item while holding the lock.
            items.add(item);
        }
    }

    public List<T> makeSnapshot() {
        List<T> copy = new ArrayList<T>();
        synchronized (items) {
            // Make a copy while holding the lock.
            for (T t : items) copy.add(t);
        }
        return copy;
    }

}

Однако в этом сценарии (и, как я понял из моего вопроса здесь ), толькоодин поток может записать в список поддержки в любой момент времени.

Есть ли способ разрешить высококонкурентные записи в список поддержки, которые блокируются только во время вызова makeSnapshot()?

Ответы [ 5 ]

3 голосов
/ 13 декабря 2011

синхронизируется (~ 20 нс) довольно быстро, и хотя другие операции могут разрешить параллелизм, они могут быть медленнее.

private final Lock lock = new ReentrantLock();
private List<T> items = new ArrayList<T>();

public void add(T item) {
    lock.lock();
    // trivial lock time.
    try {
        // Add item while holding the lock.
        items.add(item);
    } finally {
        lock.unlock();
    }
}

public List<T> makeSnapshot() {
    List<T> copy = new ArrayList<T>(), ret;
    lock.lock();
    // trivial lock time.
    try {
        ret = items;
        items = copy;
    } finally {
        lock.unlock();
    }
    return ret;
}

public static void main(String... args) {
    long start = System.nanoTime();
    Main<Integer> ints = new Main<>();
    for (int j = 0; j < 100 * 1000; j++) {
        for (int i = 0; i < 1000; i++)
            ints.add(i);
        ints.makeSnapshot();
    }
    long time = System.nanoTime() - start;
    System.out.printf("The average time to add was %,d ns%n", time / 100 / 1000 / 1000);
}

печатает

The average time to add was 28 ns

Это означает, что если высоздавая 30 миллионов записей в секунду, вы будете иметь в среднем один поток, обращающийся к списку.Если вы создаете 60 миллионов в секунду, у вас будут проблемы с параллелизмом, однако у вас, вероятно, будет гораздо больше проблем с ресурсами на этом этапе.

Использование Lock.lock () и Lock.unlock () может бытьбыстрее, когда есть высокий коэффициент конкуренции.Однако я подозреваю, что ваши потоки будут тратить большую часть времени на создание создаваемых объектов, а не на ожидание добавления объектов.

2 голосов
/ 13 декабря 2011

Вы можете использовать ConcurrentDoublyLinkedList.Здесь есть отличная реализация ConcurrentDoublyLinkedList .

До тех пор, пока вы выполняете итерацию вперед по списку при создании снимка, все должно быть хорошо.Эта реализация всегда сохраняет прямую цепочку.Обратная цепь иногда неточная.

1 голос
/ 13 декабря 2011

Прежде всего, вам следует выяснить, действительно ли это слишком медленно. Добавки к ArrayList s равны O(1) в удачном случае, поэтому, если список имеет соответствующий начальный размер, CopyOnReadList.add - это просто проверка границ и присвоение слоту массива, что довольно быстро. (И, пожалуйста, помните, что CopyOnReadList было написано, чтобы быть понятным, а не производительным.)

Если вам нужна операция без блокировки, вы можете получить что-то вроде этого:

class ConcurrentStack<T> {
    private final AtomicReference<Node<T>> stack = new AtomicReference<>();

    public void add(T value){
        Node<T> tail, head;
        do {
            tail = stack.get();
            head = new Node<>(value, tail);
        } while (!stack.compareAndSet(tail, head));
    }
    public Node<T> drain(){
        // Get all elements from the stack and reset it
        return stack.getAndSet(null);
    }
}
class Node<T> {
    // getters, setters, constructors omitted
    private final T value;
    private final Node<T> tail;
}

Обратите внимание, что хотя добавления к этой структуре должны справляться с высокой конкуренцией, у нее есть несколько недостатков. Вывод из drain выполняется довольно медленно, он использует довольно много памяти (как и все связанные списки), и вы также получаете вещи в обратном порядке вставки. (Кроме того, он не проверен и не проверен, и может фактически засосать ваше приложение. Но это всегда риск с использованием кода от какого-то случайного чувака в промежутках.)

0 голосов
/ 14 декабря 2011

Вы можете использовать ReadWriteLock , чтобы разрешить нескольким потокам параллельно выполнять операции добавления в списке поддержки, но только один поток для создания моментального снимка.Пока снимок готовится, все остальные запросы на добавление и снимок удерживаются.

ReadWriteLock поддерживает пару связанных блокировок, одну для операций только для чтения и одну для записи.Блокировка чтения может удерживаться одновременно несколькими потоками считывателя, если нет писателей.Блокировка записи является эксклюзивной.

class CopyOnReadList<T> {

    // free to use any concurrent data structure, ConcurrentLinkedQueue used as an example
    private final ConcurrentLinkedQueue<T> items = new ConcurrentLinkedQueue<T>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock shared = rwLock.readLock();
    private final Lock exclusive = rwLock.writeLock(); 

    public void add(T item) {
        shared.lock(); // multiple threads can attain the read lock
        // try-finally is overkill if items.add() never throws exceptions
        try {
          // Add item while holding the lock.
          items.add(item);
        } finally {
          shared.unlock();
        }
    }

    public List<T> makeSnapshot() {
        List<T> copy = new ArrayList<T>(); // probably better idea to use a LinkedList or the ArrayList constructor with initial size
        exclusive.lock(); // only one thread can attain write lock, all read locks are also blocked
        // try-finally is overkill if for loop never throws exceptions
        try {
          // Make a copy while holding the lock.
          for (T t : items) {
            copy.add(t);
          }
        } finally {
          exclusive.unlock();
        }
        return copy;
    }

}

Редактировать:

Блокировка чтения-записи названа так, потому что она основана на проблеме читателей-писателей, а не на том, как она используется,Используя блокировку чтения-записи, мы можем сделать так, чтобы несколько потоков достигли блокировок чтения, но только один поток достигал блокировки записи исключительно.В этом случае проблема обратная - мы хотим, чтобы несколько потоков писали (добавляли) и читали только потоки (делали снимок).Итак, мы хотим, чтобы несколько потоков использовали блокировку чтения, даже если они на самом деле мутируют.Только поток исключительно делает снимок, используя блокировку записи, хотя снимок только читает.Исключительно означает, что во время создания снимка никакие другие запросы на добавление или снимок не могут одновременно обслуживаться другими потоками.

Как указывало @PeterLawrey, очередь Concurrent будет сериализовать записи, хотя блокировки будут использоваться дляминимальная продолжительность, насколько это возможно.Мы можем использовать любую другую параллельную структуру данных, например ConcurrentDoublyLinkedList .Очередь используется только в качестве примера.Основная идея - использование блокировок чтения-записи.

0 голосов
/ 13 декабря 2011

Да, есть способ.Если вы знаете, это похоже на то, как делал ConcurrentHashMap.

Вы должны создать свою собственную структуру данных не из одного списка для всех записывающих потоков, а использовать несколько независимых списков.Каждый из таких списков должен быть защищен своей собственной блокировкой.Метод .add () должен выбрать список для добавления текущего элемента на основе Thread.currentThread.id (например, просто id% listsCount).Это даст вам хорошие свойства параллелизма для .add () - в лучшем случае потоки listsCount смогут писать без конфликтов.

В makeSnapshot () вы должны просто перебирать все списки, и для каждого списка вызахватите его, заблокируйте и скопируйте контент.

Это всего лишь идея - есть много мест, где ее можно улучшить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...