Давайте представим параллельную очередь SPS C (один производитель / один потребитель) без блокировки.
- Поток производителя читает
head
, tail
, cached_tail
и пишет head
, cached_tail
. - Поток потребителя читает
head
, tail
, cached_head
и записывает tail
, cached head
.
Обратите внимание, что cached_tail
доступен только потоку производителя, просто Например, cached_head
доступен только для потока пользователя. Их можно рассматривать как локальные переменные частного потока, поэтому они не синхронизированы, поэтому не определены как atomi c.
Структура данных очереди следующая:
#include <atomic>
#include <cstddef>
#include <thread>
struct spsc_queue
{
/// ...
// Producer variables
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> head; // shared
size_t cached_tail; // non-shared
// Consumer variables
alignas(std::hardware_destructive_interference_size) std::atomic<size_t> tail; // shared
size_t cached_head; // non-shared
std::byte padding[std::hardware_destructive_interference_size - sizeof(tail) - sizeof(cached_head)];
};
Так как Я хочу избежать ложного совместного использования , я выровнял head
и tail
по размеру строки кэша L1.
Псевдокод-i sh реализация push
/ pop
Операции следующие:
bool push(const void* elems, size_t n)
{
size_t h = atomic_load(head, relaxed);
if (num_remaining_storage(h, cached_tail) < n)
{
cached_tail = atomic_load(tail, acquire);
if (num_remaining_storage(h, cached_tail) < n)
return false;
}
// write from elems
atomic_store(head, h + n, release);
return true;
}
bool pop(void* elems, size_t n)
{
size_t t = atomic_load(tail, relaxed);
if (num_stored_elements(cached_head, t) < n)
{
cached_head = atomic_load(head, acquire);
if (num_stored_elements(cached_head, t) < n)
return false;
}
// read to elems
atomic_store(tail, t + n, release);
return true;
}
void wait_and_push(const void* elems, size_t n)
{
size_t h = atomic_load(head, relaxed);
while (num_remaining_storage(h, cached_tail) < n)
cached_tail = atomic_load(tail, acquire);
// write from elems
atomic_store(head, h + n, release);
}
void wait_and_pop(void* elems, size_t n)
{
size_t t = atomic_load(tail, relaxed);
while (num_stored_elements(cached_head, t) < n)
cached_head = atomic_load(head, acquire);
// write to elems
atomic_store(tail, t + n, release);
}
При инициализации (не перечисленной здесь) все индексы установлены на 0
. Функции num_remaining_storage
и num_stored_elements
являются функциями const
, выполняющими простые вычисления на основе переданных аргументов и неизменной емкости очереди - они не выполняют ни одного атома c чтения или записи.
Теперь вопрос в : нужно ли также выравнивать cached_tail
и cached_head
, чтобы полностью избежать ложного разделения любого из индексов, или все в порядке, как есть. Поскольку cached_tail
является частным производителем, а cached_head
является частным потребителем, я думаю, что cached_tail
может находиться в той же строке кэша, что и head
(строка кэша производителя), так же, как cached_head
в той же строке кэша, что и tail
(строка кэша потребителя) без ложного обмена, чтобы когда-либо происходить.
Я что-то упустил?