Для одного из моих проектов, который должен поддерживать одновременное чтение и запись, мне нужен контейнер, который может буферизовать элементы, пока потребитель не заберет каждый элемент, находящийся в данный момент в буфере, сразу.Поскольку производители должны иметь возможность производить данные независимо от того, читал ли потребитель текущий буфер, я разработал собственную реализацию, которая с помощью AtomicReference
добавляет каждую запись к резервной копии ConcurrentLinkedQueue
до тех пор, пока не будет выполнено переворачивание, которое вызываеттекущая запись, которая будет возвращена при сохранении новой записи с пустой очередью и метаданными, которые будут храниться в этом AtomicReference
атомно.
Я нашел решение, такое как
public class FlippingDataContainer<E> {
private final AtomicReference<FlippingDataContainerEntry<E>> dataObj = new AtomicReference<>();
public FlippingDataContainer() {
dataObj.set(new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0));
}
public FlippingDataContainerEntry<E> put(E value) {
if (null != value) {
while (true) {
FlippingDataContainerEntry<E> data = dataObj.get();
FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
if (dataObj.compareAndSet(data, updated)) {
return merged;
}
}
}
return null;
}
public FlippingDataContainerEntry<E> flip() {
FlippingDataContainerEntry<E> oldData;
FlippingDataContainerEntry<E> newData = new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0);
while (true) {
oldData = dataObj.get();
if (dataObj.compareAndSet(oldData, newData)) {
return oldData;
}
}
}
public boolean isEmptry() {
return dataObj.get().getQueue().isEmpty();
}
}
Кактекущее значение должно быть передано в очередь поддержки, что требует особой осторожности.Текущая реализация метода from(data, value)
выглядит примерно так:
static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
Queue<E> queue = new ConcurrentLinkedQueue<>(data.getQueue());
queue.add(value);
return new FlippingDataContainerEntry<>(queue,
data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}
Из-за возможных повторных попыток, вызванных другим потоком, обновили значение непосредственно перед тем, как этот поток мог выполнить обновление, мне нужно скопироватьфактическая очередь при каждой попытке записи, так как в противном случае запись будет добавлена в общую очередь, даже если элементарная ссылка не может быть обновлена.Поэтому простое добавление значения в общую очередь может привести к тому, что запись значения будет добавлена в очередь несколько раз, тогда как на самом деле это должно произойти только один раз.
Копирование всей очереди является довольно дорогой задачей, поэтому я возился спросто задайте текущую очередь вместо копирования очереди в методе from(data, value)
и вместо добавления элемента значения в общую очередь в блоке, выполняемом при обновлении:
public FlippingDataContainerEntry<E> put(E value) {
if (null != value) {
while (true) {
FlippingDataContainerEntry<E> data = dataObj.get();
FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);
if (data.compareAndSet(data, updated)) {
updated.getQueue().add(value);
return updated;
}
}
}
return null;
}
в пределах from(data, value)
Теперь я только устанавливаю очередь без непосредственного добавления элемента значения
static <E> FlippingDataContainerEntry<E> from(FlippingDataContainerEntry<E> data, E value) {
return new FlippingDataContainerEntry<>(data.getQueue(),
data.getKeyLength() + (value.getKeyAsBytes() != null ? value.getKeyAsBytes().length : 0),
data.getValueLength() + (value.getValueAsBytes() != null ? value.getValueAsBytes().length : 0),
data.getAuxiliaryLength() + (value.getAuxiliaryAsBytes() != null ? value.getAuxiliaryAsBytes().length : 0));
}
Хотя это позволяет выполнить тест в 10+ раз быстрее по сравнению с кодом, копирующим очередь, он также довольно часто не проходит тест потребления, как сейчасдобавление элемента значения в очередь может произойти сразу после того, как поток потребителя перевернет очередь и обработает данные, и, следовательно, не все элементы, по-видимому, потребляются.
Фактический вопрос теперь заключается в том, может ли копированиеиз очереди поддержки следует избегатьв повышении производительности, в то же время позволяя атомарно обновлять содержимое очереди с помощью алгоритмов без блокировок и, следовательно, также избегать потери некоторых записей в середине пути?