Pthread: увеличение времени выполнения программы относительно количества потоков. - PullRequest
0 голосов
/ 11 ноября 2019

Я пытаюсь построить эффективную параллельную хэш-карту, используя pthreads, C.

Ниже приведена моя реализация

#include <stdlib.h>
#include <stddef.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <stdio.h>
#include <linux/limits.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

#define ENTRIES_PER_BUCKET 3

struct Bucket
{
    pthread_mutex_t mutex;
    void **keys;
    int *vals;
    struct Bucket *next;
};

struct Concurrent_Map
{
    struct Bucket *buckets;
    map_keys_equality *keys_eq;
    map_key_hash *khash;
    int capacity;
};

int concurrent_map_allocate /*@ <t> @*/ (map_keys_equality *keq, map_key_hash *khash,
                                         unsigned capacity,
                                         struct Concurrent_Map **map_out)

{

    struct Concurrent_Map *old_map_val = *map_out;
    struct Concurrent_Map *map_alloc = malloc(sizeof(struct Concurrent_Map));
    if (map_alloc == NULL)
    {
        return 0;
    }
    *map_out = (struct Concurrent_Map *)map_alloc;

    struct Bucket *buckets_alloc = (struct Bucket *)malloc(sizeof(struct Bucket) * (int)capacity);

    if (buckets_alloc == NULL)
    {
        free(map_alloc);
        *map_out = old_map_val;
        return 0;
    }
    (*map_out)->buckets = buckets_alloc;
    (*map_out)->capacity = capacity;
    (*map_out)->keys_eq = keq;
    (*map_out)->khash = khash;

    unsigned i;

    for (i = 0; i < capacity; i++)
    {
        if (pthread_mutex_init(&((*map_out)->buckets[i].mutex), NULL) == 0)
        {
            void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

            if (key_alloc != NULL)
            {
                (*map_out)->buckets[i].keys = key_alloc;

                int k;
                for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                {

                    (*map_out)->buckets[i].keys[k] = NULL;
                }
            }

            int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

            if (vals_alloc != NULL)
            {
                (*map_out)->buckets[i].vals = vals_alloc;

                int k;
                for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                {
                    (*map_out)->buckets[i].vals[k] = -1;
                }
            }

            (*map_out)->buckets[i].next = NULL;
        }
    }

    // todo exceptions in allocation

    return 1;
}

static unsigned loop(unsigned k, unsigned capacity)
{
    unsigned g = k % capacity;

    unsigned res = (g + capacity) % capacity;

    return res;
}

int concurrent_map_get(struct Concurrent_Map *map, void *key, int *value_out)

{
    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash, map->capacity);
    unsigned bucket_index = loop(start + 0, map->capacity);

    if (bucket_index < map->capacity)
    {

        struct Bucket *bucket = &(map->buckets[bucket_index]);

        pthread_mutex_t mutex = bucket->mutex;

        pthread_mutex_lock(&mutex);

        int j;
        do
        {
            for (j = 0; j < ENTRIES_PER_BUCKET; j++)
            {
                int val = bucket->vals[j];
                if (map->keys_eq(bucket->keys[j], key))
                {
                    if (bucket->vals[j] == val)
                    {
                        *value_out = val;
                        return 1;
                    }
                    else
                    {
                        *value_out = -1;
                        return 0;
                    }
                }
            }
            if (bucket->next != NULL)
            {
                bucket = (bucket->next);
            }
            else
            {
                break;
                pthread_mutex_unlock(&mutex);
            }

            pthread_mutex_unlock(&mutex);

        } while (1);
    }
    *value_out = -1;
    return 0;
}

int concurrent_map_put(struct Concurrent_Map *map, void *key, int value)

{
    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash, map->capacity);
    unsigned bucket_index = loop(start + 0, map->capacity);

    struct Bucket *bucket = &(map->buckets[bucket_index]);

    int j;

    do
    {

        pthread_mutex_t mutex = bucket->mutex;

        int j;

        pthread_mutex_lock(&mutex);

        for (j = 0; j < ENTRIES_PER_BUCKET; j++)
        {
            if (map->keys_eq(bucket->keys[j], key))
            {
                pthread_mutex_unlock(&mutex);
                return 0;
            }
            else if (bucket->keys[j] == NULL)
            {
                bucket->vals[j] = value;
                bucket->keys[j] = key;
                pthread_mutex_unlock(&mutex);
                return 1;
            }
        }
        if (bucket->next == NULL)

        {
            // allocate a new bucket

            struct Bucket *new_bucket = malloc(sizeof(struct Bucket));

            if (pthread_mutex_init(&(new_bucket->mutex), NULL) == 0)
            {
                void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));

                if (key_alloc != NULL)
                {
                    new_bucket->keys = key_alloc;

                    int k;
                    for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                    {
                        new_bucket->keys[k] = NULL;
                    }
                }

                int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));

                if (vals_alloc != NULL)
                {
                    new_bucket->vals = vals_alloc;

                    int k;
                    for (k = 0; k < ENTRIES_PER_BUCKET; k++)
                    {
                        new_bucket->vals[k] = -1;
                    }
                }

                bucket->next = new_bucket;
            }
        }

        pthread_mutex_unlock(&mutex);
        bucket = bucket->next;

    } while (1);

    return 0;
}

int concurrent_map_erase(struct Concurrent_Map *map, void *key, void **trash)

{

    map_key_hash *khash = map->khash;
    unsigned hash = khash(key);

    unsigned start = loop(hash, map->capacity);
    unsigned bucket_index = loop(start + 0, map->capacity);

    struct Bucket *bucket = &(map->buckets[bucket_index]);

    int j;

    do
    {

        pthread_mutex_t mutex = bucket->mutex;

        int j;

        pthread_mutex_lock(&mutex);

        for (j = 0; j < ENTRIES_PER_BUCKET; j++)
        {
            if (map->keys_eq(bucket->keys[j], key))
            {
                bucket->vals[j] = -1;
                bucket->keys[j] = NULL;
                pthread_mutex_unlock(&mutex);
                return 1;
            }
        }

        pthread_mutex_unlock(&mutex);
        if (bucket->next != NULL)
        {
            bucket = (bucket->next);
        }
        else
        {
            break;
        }

    } while (1);
    return 0;
}

int concurrent_map_size(struct Concurrent_Map *map)

{
    int num_buckets = 0;

    struct Bucket *buckets = map->buckets;
    unsigned i;

    for (i = 0; i < map->capacity; i++)
    {
        struct Bucket bucket = buckets[i];
        do
        {
            num_buckets++;
            if (bucket.next != NULL)
            {
                bucket = *(bucket.next);
            }
            else
            {
                break;
            }

        } while (1);
    }
    return num_buckets * ENTRIES_PER_BUCKET;
}
struct FlowId
{
    int src_port;
    int dst_port;
    int src_ip;
    int dst_ip;
    int internal_device;
    int protocol;
};

bool FlowId_eq(void *a, void *b)

{
    if (a == NULL || b == NULL)
    {
        return false;
    }
    struct FlowId *id1 = a;
    struct FlowId *id2 = b;

    return (id1->src_port == id2->src_port) && (id1->dst_port == id2->dst_port) && (id1->src_ip == id2->src_ip) && (id1->dst_ip == id2->dst_ip) && (id1->internal_device == id2->internal_device) && (id1->protocol == id2->protocol);
}

unsigned FlowId_hash(void *obj)

{
    struct FlowId *id = obj;
    unsigned hash = 0;
    hash = __builtin_ia32_crc32si(hash, id->src_port);
    hash = __builtin_ia32_crc32si(hash, id->dst_port);
    hash = __builtin_ia32_crc32si(hash, id->src_ip);
    hash = __builtin_ia32_crc32si(hash, id->dst_ip);
    hash = __builtin_ia32_crc32si(hash, id->internal_device);
    hash = __builtin_ia32_crc32si(hash, id->protocol);
    return hash;
}

struct Concurrent_Map *concurrent_map;

#define NUM_THREADS 2
#define NUM_PACKETS 10000000

void *expirator(void *arg)
{
    // printf("Thread started executing\n");
    unsigned i = 0;
    int error = 0;
    unsigned packet_count = NUM_PACKETS / NUM_THREADS;
    while (i < packet_count)
    {
        i++;
        struct FlowId *id = malloc(sizeof(struct FlowId));
        struct FlowId *id1 = malloc(sizeof(struct FlowId));
        id->dst_ip = 1;
        id->src_ip = 1;
        id->internal_device = 1;
        id->protocol = 1;
        id->src_port = 1;
        id->dst_port = rand() % 65536;

        id1->dst_ip = 1;
        id1->src_ip = 1;
        id1->internal_device = 1;
        id1->protocol = 1;
        id1->src_port = 1;
        id1->dst_port = rand() % 65536;

        int external_port = rand() % 65536;
        int external;

        concurrent_map_erase(concurrent_map, id, NULL);

        concurrent_map_put(concurrent_map, id, external_port);
        concurrent_map_get(concurrent_map, id, &external);

        if (external_port != external)
        {
            error++;
        }
        else
        {
        }
    }
    return NULL;
}

int main()
{

    clock_t begin = clock();

    concurrent_map_allocate(FlowId_eq, FlowId_hash, 65536, &(concurrent_map));

    pthread_t *threads = malloc(sizeof(pthread_t) * NUM_THREADS);
    int i;
    for (i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_create(&threads[i], NULL, expirator, NULL) != 0)
        {
            printf("Error creating threads");
            exit(0);
        }
    }
    for (i = 0; i < NUM_THREADS; i++)
    {
        if (pthread_join(threads[i], NULL) != 0)
        {
            printf("Error joining threads");
            exit(0);
        }
    }
    clock_t end = clock();
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
    printf("%lf\n", time_spent);
    return 0;
}

Вот как запустить эту программу.

gcc  concurrent_map.c  -o test-concurrent-new -lpthread -msse4.2 -O3

Затем я измеряю время выполнения для фиксированной рабочей нагрузки, и следующие значения времени я наблюдал.

1: 3.29

2: 6.687811

3: 5.88

4: 6,23

5: 6,38

6: 6,52

7: 6,74

8: 6,82

Кажется, что когда количество потоков увеличивается, время выполнения увеличивается и остается почти таким же.

Я профилировал этот код с помощью Mutrace, который ищет конфликт мьютексов. Оказывается,

Нет мьютекса в соответствии с параметрами фильтрации.

Я проверил количество пропусков кеша, и оказалось, что количество пропусков кеша примерно равнопри изменении количества потоков.

Почему время выполнения не уменьшается при увеличении количества потоков?

Я запускаю это на 32-ядерном компьютере

1 Ответ

0 голосов
/ 11 ноября 2019

rand () обычно не подходит для многопоточного исполнения. Вместо этого используйте rand_r ().

Также используйте инструмент времени linux для синхронизации приложения.

Создание рабочей нагрузки накладывает огромные накладные расходы, и я предполагаю, что это узкое место, а не одновременная карта хешей

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...