Мне пришла в голову идея, которую я пытаюсь реализовать для стека без блокировки, который не использует подсчет ссылок для решения проблемы ABA, а также правильно обрабатывает восстановление памяти.По своей концепции он похож на RCU и опирается на две функции: пометка записи в списке как удаленной и отслеживание читателей, пересекающих список.Первый простой, он просто использует младший бит указателя.Последнее - моя «умная» попытка подхода к реализации неограниченного стека без блокировки.
По сути, когда любой поток пытается пройти по списку, увеличивается один атомарный счетчик (list.entries).Когда обход завершен, увеличивается второй счетчик (list.exits).
Распределение узлов обрабатывается push, а освобождение - pop.
Операции push и pop довольно простыпохож на простую реализацию стека без блокировок, но узлы, помеченные для удаления, должны быть пройдены, чтобы получить запись без пометки.Таким образом, push в основном аналогичен вставке связанного списка.
Операция pop аналогично проходит по списку, но использует atomic_fetch_or, чтобы пометить узлы как удаленные при обходе, пока не достигнет неотмеченного узла.
После обхода списка из 0 или более отмеченных узлов поток, который выскочил, попытается выполнить CAS в начале стека.По крайней мере, один поток одновременно выталкивает сообщение, и после этого все читатели, входящие в стек, больше не будут видеть ранее помеченные узлы.
Поток, который успешно обновляет список, затем загружает атомарный list.entries и, в основном,spin-load atomic.exits до тех пор, пока счетчик не превысит list.entries.Это должно подразумевать, что все читатели «старой» версии списка завершили.Затем поток просто освобождает список отмеченных узлов, которые он поменял с верхней части списка.
Таким образом, следствием операции pop должно быть (я думаю), что не может быть проблем с ABA, потому что узлыосвобожденные не возвращаются в пригодный для использования пул указателей до тех пор, пока не завершатся все одновременные читатели, использующие их, и, очевидно, проблема восстановления памяти также будет решена по той же причине.
Так или иначе, это теория,но я все еще ломаю голову над реализацией, потому что в настоящее время она не работает (в многопоточном случае).Похоже, что я получаю некоторые записи после бесплатных проблем среди других вещей, но мне трудно определить проблему, или, возможно, мои предположения ошибочны, и это просто не сработает.
Любое понимание было бы оченьценится как по концепции, так и по подходам к отладке кода.
Вот мой текущий (неработающий) код (скомпилировать с помощью gcc -D_GNU_SOURCE -std = c11 -Wall -O0 -g -pthread -o listlist.c):
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <stdio.h>
#include <unistd.h>
#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)
typedef uint64_t list_data_t;
typedef struct list_node_t {
struct list_node_t * _Atomic next;
list_data_t data;
} list_node_t;
typedef struct {
list_node_t * _Atomic head;
int64_t _Atomic size;
uint64_t _Atomic entries;
uint64_t _Atomic exits;
} list_t;
enum {
NODE_IDLE = (0x0),
NODE_REMOVED = (0x1 << 0),
NODE_FREED = (0x1 << 1),
NODE_FLAGS = (0x3),
};
static __thread struct {
uint64_t add_count;
uint64_t remove_count;
uint64_t added;
uint64_t removed;
uint64_t mallocd;
uint64_t freed;
} stats;
#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))
list_node_t * list_node_new(list_data_t data)
{
list_node_t * new = malloc(sizeof(*new));
new->data = data;
stats.mallocd++;
return new;
}
void list_node_free(list_node_t * node)
{
free(node);
stats.freed++;
}
static void list_add(list_t * list, list_data_t data)
{
atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);
list_node_t * new = list_node_new(data);
list_node_t * _Atomic * next = &list->head;
list_node_t * current = atomic_load_explicit(next, memory_order_seq_cst);
do
{
stats.add_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.add_count++;
current = NODE_POINTER(current);
next = ¤t->next;
current = atomic_load_explicit(next, memory_order_seq_cst);
}
atomic_store_explicit(&new->next, current, memory_order_seq_cst);
}
while(!atomic_compare_exchange_weak_explicit(
next, ¤t, new,
memory_order_seq_cst, memory_order_seq_cst));
atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
stats.added++;
}
static bool list_remove(list_t * list, list_data_t * pData)
{
uint64_t entries = atomic_fetch_add_explicit(
&list->entries, 1, memory_order_seq_cst);
list_node_t * start = atomic_fetch_or_explicit(
&list->head, NODE_REMOVED, memory_order_seq_cst);
list_node_t * current = start;
stats.remove_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.remove_count++;
current = NODE_POINTER(current);
current = atomic_fetch_or_explicit(¤t->next,
NODE_REMOVED, memory_order_seq_cst);
}
uint64_t exits = atomic_fetch_add_explicit(
&list->exits, 1, memory_order_seq_cst) + 1;
bool result = false;
current = NODE_POINTER(current);
if (current != NULL)
{
result = true;
*pData = current->data;
current = atomic_load_explicit(
¤t->next, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size,
-1, memory_order_seq_cst);
stats.removed++;
}
start = NODE_SET_FLAG(start, NODE_REMOVED);
if (atomic_compare_exchange_strong_explicit(
&list->head, &start, current,
memory_order_seq_cst, memory_order_seq_cst))
{
entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
while ((int64_t)(entries - exits) > 0)
{
pthread_yield();
exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
}
list_node_t * end = NODE_POINTER(current);
list_node_t * current = NODE_POINTER(start);
while (current != end)
{
list_node_t * tmp = current;
current = atomic_load_explicit(¤t->next, memory_order_seq_cst);
list_node_free(tmp);
current = NODE_POINTER(current);
}
}
return result;
}
static list_t list;
pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;
void * thread_entry(void * arg)
{
sleep(2);
int id = *(int *)arg;
for (int i = 0; i < NUM_OPS; i++)
{
bool insert = random() % 2;
if (insert)
{
list_add(&list, i);
}
else
{
list_data_t data;
list_remove(&list, &data);
}
}
struct rusage u;
getrusage(RUSAGE_THREAD, &u);
pthread_mutex_lock(&ioLock);
printf("Thread %d stats:\n", id);
printf("\tadded = %lu\n", stats.added);
printf("\tremoved = %lu\n", stats.removed);
printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
printf("\tadded count = %lu\n", stats.add_count);
printf("\tremoved count = %lu\n", stats.remove_count);
printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
printf("\tmallocd = %lu\n", stats.mallocd);
printf("\tfreed = %lu\n", stats.freed);
printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
printf("\tutime = %f\n", u.ru_utime.tv_sec
+ u.ru_utime.tv_usec / 1000000.0f);
printf("\tstime = %f\n", u.ru_stime.tv_sec
+ u.ru_stime.tv_usec / 1000000.0f);
pthread_mutex_unlock(&ioLock);
return NULL;
}
int main(int argc, char ** argv)
{
struct {
pthread_t thread;
int id;
}
threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++)
{
threads[i].id = i;
pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
}
for (int i = 0; i < NUM_THREADS; i++)
{
pthread_join(threads[i].thread, NULL);
}
printf("Size = %ld\n", atomic_load(&list.size));
uint32_t count = 0;
list_data_t data;
while(list_remove(&list, &data))
{
count++;
}
printf("Removed %u\n", count);
}