Контейнер без блокировки с откидными буферами - PullRequest
1 голос
/ 10 марта 2019

Для одного из моих проектов, который должен поддерживать одновременное чтение и запись, мне нужен контейнер, который может буферизовать элементы, пока потребитель не заберет каждый элемент, находящийся в данный момент в буфере, сразу.Поскольку производители должны иметь возможность производить данные независимо от того, читал ли потребитель текущий буфер, я разработал собственную реализацию, которая с помощью AtomicReference добавляет каждую запись к резервной копии ConcurrentLinkedQueue до тех пор, пока не будет выполнено переворачивание, которое вызываеттекущая запись, которая будет возвращена при сохранении новой записи с пустой очередью и метаданными, которые будут храниться в этом AtomicReference атомно.

Я нашел решение, такое как

public class FlippingDataContainer<E> {

  private final AtomicReference<FlippingDataContainerEntry<E>> dataObj = new AtomicReference<>();

  public FlippingDataContainer() {
    dataObj.set(new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0));
  }

  public FlippingDataContainerEntry<E> put(E value) {
    if (null != value) {
      while (true) {
        FlippingDataContainerEntry<E> data = dataObj.get();
        FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
        if (dataObj.compareAndSet(data, updated)) {
          return merged;
        }
      }
    }
    return null;
  }

  public FlippingDataContainerEntry<E> flip() {
    FlippingDataContainerEntry<E> oldData;
    FlippingDataContainerEntry<E> newData = new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0);
    while (true) {
      oldData = dataObj.get();
      if (dataObj.compareAndSet(oldData, newData)) {
        return oldData;
      }
    }
  }

  public boolean isEmptry() {
    return dataObj.get().getQueue().isEmpty();
  }
}

Кактекущее значение должно быть передано в очередь поддержки, что требует особой осторожности.Текущая реализация метода from(data, value) выглядит примерно так:

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
  Queue<E> queue = new ConcurrentLinkedQueue<>(data.getQueue());
  queue.add(value);
  return new FlippingDataContainerEntry<>(queue,
      data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
      data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
      data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

Из-за возможных повторных попыток, вызванных другим потоком, обновили значение непосредственно перед тем, как этот поток мог выполнить обновление, мне нужно скопироватьфактическая очередь при каждой попытке записи, так как в противном случае запись будет добавлена ​​в общую очередь, даже если элементарная ссылка не может быть обновлена.Поэтому простое добавление значения в общую очередь может привести к тому, что запись значения будет добавлена ​​в очередь несколько раз, тогда как на самом деле это должно произойти только один раз.

Копирование всей очереди является довольно дорогой задачей, поэтому я возился спросто задайте текущую очередь вместо копирования очереди в методе from(data, value) и вместо добавления элемента значения в общую очередь в блоке, выполняемом при обновлении:

public FlippingDataContainerEntry<E> put(E value) {
  if (null != value) {
    while (true) {
      FlippingDataContainerEntry<E> data = dataObj.get();
      FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
      if (data.compareAndSet(data, updated)) {
        updated.getQueue().add(value);
        return updated;
      }
    }
  }
  return null;
}

в пределах from(data, value)Теперь я только устанавливаю очередь без непосредственного добавления элемента значения

static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
  return new FlippingDataContainerEntry<>(data.getQueue(),
      data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
      data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
      data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}

Хотя это позволяет выполнить тест в 10+ раз быстрее по сравнению с кодом, копирующим очередь, он также довольно часто не проходит тест потребления, как сейчасдобавление элемента значения в очередь может произойти сразу после того, как поток потребителя перевернет очередь и обработает данные, и, следовательно, не все элементы, по-видимому, потребляются.

Фактический вопрос теперь заключается в том, может ли копированиеиз очереди поддержки следует избегатьв повышении производительности, в то же время позволяя атомарно обновлять содержимое очереди с помощью алгоритмов без блокировок и, следовательно, также избегать потери некоторых записей в середине пути?

Ответы [ 2 ]

1 голос
/ 10 марта 2019

Во-первых, давайте констатируем очевидное: лучшее решение - избегать написания таких пользовательских классов.Возможно, что-то столь же простое, как java.util.concurrent.LinkedTransferQueue будет работать так же хорошо и будет менее подвержено ошибкам.И если LinkedTransferQueue не работает, то как насчет LMAX или чего-то подобного?Вы смотрели на существующие решения?

И если вам все еще нужно / нужно индивидуальное решение, то у меня есть набросок немного другого подхода, который позволит избежать копирования:

Идеядля операций put вращаться вокруг некоторой атомарной переменной, пытаясь установить ее.Если потоку удается установить его, он получает эксклюзивный доступ к текущей очереди, что означает, что он может добавить к нему.После добавления он сбрасывает атомарную переменную, чтобы позволить другим потокам добавлять.Это в основном спин-блокировка .Таким образом, конфликт между потоками происходит до добавления в очередь, а не после.

0 голосов
/ 11 марта 2019

Мне нужно копировать текущую очередь при каждой попытке записи

Ваша идея звучит как RCU (https://en.wikipedia.org/wiki/Read-copy-update). Сборка мусора в Java делает RCU намного проще, решаяпроблема освобождения для вас (я думаю).

Если я правильно понял из краткого обзора вашего вопроса, ваши «читатели» на самом деле хотят «потребовать» все текущее содержимое контейнера для себя.они тоже пишущие, но вместо read + copy они могут просто создать пустой контейнер и атомарно обменяться ссылкой на верхний уровень, чтобы указать на это (таким образом, запрашивая старый контейнер для монопольного доступа.)

Большим преимуществом RCU является то, что сама структура данных контейнера не обязательно должна быть везде атомарной, если у вас есть ссылка на нее, никто больше ее не модифицирует.


Единственная хитростьчасть приходит, когда автор хочет добавить новый материал в непустой контейнер. Затем вы копируете существующий контейнер и изменяете копию, иПопытайтесь в CAS (сравнить-обменять, то есть compareAndSet()) обновленную копию в совместно используемом верхнем уровне AtomicReference.

Автор не может просто безоговорочно обмениваться, поскольку это может привести кпустой контейнер и некуда его положить.Если это не нормально для писателя держаться за партию работы и вращаться в ожидании читателя, чтобы очистить очередь ...


Я предполагаю, что у ваших писателей есть партииработать, чтобы поставить в очередь сразу;в противном случае RCU, вероятно, слишком дорог для авторов. Извините, если я пропустил деталь в вашем вопросе, которая исключает это.Я не использую Java на регулярной основе, поэтому я просто пишу это быстро, если это полезно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...