Идея реализации Lock Free стека - в настоящее время не работает - PullRequest
0 голосов
/ 11 июня 2018

Мне пришла в голову идея, которую я пытаюсь реализовать для стека без блокировки, который не использует подсчет ссылок для решения проблемы ABA, а также правильно обрабатывает восстановление памяти.По своей концепции он похож на RCU и опирается на две функции: пометка записи в списке как удаленной и отслеживание читателей, пересекающих список.Первый простой, он просто использует младший бит указателя.Последнее - моя «умная» попытка подхода к реализации неограниченного стека без блокировки.

По сути, когда любой поток пытается пройти по списку, увеличивается один атомарный счетчик (list.entries).Когда обход завершен, увеличивается второй счетчик (list.exits).

Распределение узлов обрабатывается push, а освобождение - pop.

Операции push и pop довольно простыпохож на простую реализацию стека без блокировок, но узлы, помеченные для удаления, должны быть пройдены, чтобы получить запись без пометки.Таким образом, push в основном аналогичен вставке связанного списка.

Операция pop аналогично проходит по списку, но использует atomic_fetch_or, чтобы пометить узлы как удаленные при обходе, пока не достигнет неотмеченного узла.

После обхода списка из 0 или более отмеченных узлов поток, который выскочил, попытается выполнить CAS в начале стека.По крайней мере, один поток одновременно выталкивает сообщение, и после этого все читатели, входящие в стек, больше не будут видеть ранее помеченные узлы.

Поток, который успешно обновляет список, затем загружает атомарный list.entries и, в основном,spin-load atomic.exits до тех пор, пока счетчик не превысит list.entries.Это должно подразумевать, что все читатели «старой» версии списка завершили.Затем поток просто освобождает список отмеченных узлов, которые он поменял с верхней части списка.

Таким образом, следствием операции pop должно быть (я думаю), что не может быть проблем с ABA, потому что узлыосвобожденные не возвращаются в пригодный для использования пул указателей до тех пор, пока не завершатся все одновременные читатели, использующие их, и, очевидно, проблема восстановления памяти также будет решена по той же причине.

Так или иначе, это теория,но я все еще ломаю голову над реализацией, потому что в настоящее время она не работает (в многопоточном случае).Похоже, что я получаю некоторые записи после бесплатных проблем среди других вещей, но мне трудно определить проблему, или, возможно, мои предположения ошибочны, и это просто не сработает.

Любое понимание было бы оченьценится как по концепции, так и по подходам к отладке кода.

Вот мой текущий (неработающий) код (скомпилировать с помощью gcc -D_GNU_SOURCE -std = c11 -Wall -O0 -g -pthread -o listlist.c):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}

1 Ответ

0 голосов
/ 11 июня 2018

Вы упоминаете, что пытаетесь решить проблему ABA, но описание и код на самом деле являются попыткой решить более сложную проблему: восстановление памяти .

Эта проблема обычно возникаетв «удалении» функционала коллекций без блокировок, реализованных на языках без сборки мусора.Основная проблема заключается в том, что поток, удаляющий узел из общей структуры, часто не знает, когда безопасно освободить удаленный узел, потому что другие операции чтения могут по-прежнему иметь ссылку на него.Решение этой проблемы часто, в качестве побочного эффекта, также решает проблему ABA: речь идет о успешной операции CAS, даже если базовый указатель (и состояние объекта) были изменены по крайней мере дважды втем временем, заканчивая исходным значением , но представляющим совершенно другое состояние.

Проблема АБА проще в том смысле, что существует несколько простых решений проблемы АБА, в частности, которые не приводят к решению проблемы «восстановления памяти».Также проще в том смысле, что аппаратное обеспечение, которое может обнаружить изменение местоположения, например, с помощью LL / SC или примитивов транзакционной памяти, может вообще не показывать проблему.

Таким образом, вы охотитесьдля решения проблемы восстановления памяти, и это также позволит избежать проблемы ABA.

Суть вашей проблемы заключается в следующем утверждении:

Поток, который затем успешно обновляет списокзагружает atomic list.entries и в основном спин-загружает atomic.exits до тех пор, пока этот счетчик не превысит list.entries. Это должно означать, что все читатели "старой" версии списка завершили работу. Затем поток просто освобождает список отмеченных узлов, которые он поменял с верхней части списка.

Эта логика не работает.Ожидание, пока list.exits (вы говорите atomic.exits , но я думаю, что это опечатка, поскольку вы говорите только о list.exits в другом месте), будет больше, чем list.entries, только скажет вам, что теперь больше общих выходов , чем было записей в момент, когда мутирующий поток захватил количество записей.Однако эти выходы могут быть вызваны приходом и уходом новых читателей: это вовсе не означает, что все старые читатели закончили , как вы утверждаете!

Вот простой пример.Сначала поток записи T1 и поток чтения T2 получают доступ к списку примерно в одно и то же время, поэтому list.entries равен 2, а list.exits равен 0. Поток записи извлекает узел и сохраняет текущее значение (2)из list.entries и ожидает, пока lists.exits будет больше 2. Теперь прибывают еще три потока чтения, T3, T4, T5, быстро прочитают список и уйдут.Теперь lists.exits равно 3, и ваше условие выполнено, а T1 освобождает узел.T2 никуда не делся и взрывается, так как читает свободную ноду!

Основная идея, которая у вас есть, может работать, но ваш двухконтурный подход, в частности, определенно не работает.

Это хорошо изученная проблема, поэтому вам не нужно изобретать собственный алгоритм (см. Ссылку выше) или даже писать собственный код, поскольку такие вещи, как librcu и concurrencykit уже существует.

Для образовательных целей

Если вы хотели бы сделать эту работу в образовательных целях, один из подходов заключается в том, чтобы обеспечить поступление потоков впосле начала изменения используйте другой набор list.entry/exit счетчиков.Одним из способов сделать это может быть счетчик генерации, и когда автор хочет изменить список, он увеличивает счетчик генерации, что заставляет новых читателей переключаться на другой набор list.entry/exit счетчиков.

Теперьавтору просто нужно ждать list.entry[old] == list.exists[old], что означает, что все старые читатели ушли.Вы также можете просто использовать один счетчик для каждого поколения: у вас нет двух entry/exit счетчиков (хотя это может помочь уменьшить конкуренцию).

Конечно, вы знаете, есть новая проблема управления этим списком отдельных счетчиков для поколения ... который выглядит как первоначальная проблема создания списка без блокировок!Эта проблема немного проще, потому что вы можете установить разумные ограничения на количество поколений «в полете» и просто распределить их все заранее, или вы можете реализовать ограниченный тип списка без блокировок, который легче рассуждатьпотому что добавления и удаления происходят только в голове или хвосте.

...