Я пытаюсь построить эффективную параллельную хэш-карту, используя pthreads, C.
Ниже приведена моя реализация
#include <stdlib.h>
#include <stddef.h>
#include <pthread.h>
#include <stdint.h>
#include <limits.h>
#include <stdio.h>
#include <linux/limits.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#define ENTRIES_PER_BUCKET 3
struct Bucket
{
pthread_mutex_t mutex;
void **keys;
int *vals;
struct Bucket *next;
};
struct Concurrent_Map
{
struct Bucket *buckets;
map_keys_equality *keys_eq;
map_key_hash *khash;
int capacity;
};
int concurrent_map_allocate /*@ <t> @*/ (map_keys_equality *keq, map_key_hash *khash,
unsigned capacity,
struct Concurrent_Map **map_out)
{
struct Concurrent_Map *old_map_val = *map_out;
struct Concurrent_Map *map_alloc = malloc(sizeof(struct Concurrent_Map));
if (map_alloc == NULL)
{
return 0;
}
*map_out = (struct Concurrent_Map *)map_alloc;
struct Bucket *buckets_alloc = (struct Bucket *)malloc(sizeof(struct Bucket) * (int)capacity);
if (buckets_alloc == NULL)
{
free(map_alloc);
*map_out = old_map_val;
return 0;
}
(*map_out)->buckets = buckets_alloc;
(*map_out)->capacity = capacity;
(*map_out)->keys_eq = keq;
(*map_out)->khash = khash;
unsigned i;
for (i = 0; i < capacity; i++)
{
if (pthread_mutex_init(&((*map_out)->buckets[i].mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));
if (key_alloc != NULL)
{
(*map_out)->buckets[i].keys = key_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
(*map_out)->buckets[i].keys[k] = NULL;
}
}
int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));
if (vals_alloc != NULL)
{
(*map_out)->buckets[i].vals = vals_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
(*map_out)->buckets[i].vals[k] = -1;
}
}
(*map_out)->buckets[i].next = NULL;
}
}
// todo exceptions in allocation
return 1;
}
static unsigned loop(unsigned k, unsigned capacity)
{
unsigned g = k % capacity;
unsigned res = (g + capacity) % capacity;
return res;
}
int concurrent_map_get(struct Concurrent_Map *map, void *key, int *value_out)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
if (bucket_index < map->capacity)
{
struct Bucket *bucket = &(map->buckets[bucket_index]);
pthread_mutex_t mutex = bucket->mutex;
pthread_mutex_lock(&mutex);
int j;
do
{
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
int val = bucket->vals[j];
if (map->keys_eq(bucket->keys[j], key))
{
if (bucket->vals[j] == val)
{
*value_out = val;
return 1;
}
else
{
*value_out = -1;
return 0;
}
}
}
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
pthread_mutex_unlock(&mutex);
}
pthread_mutex_unlock(&mutex);
} while (1);
}
*value_out = -1;
return 0;
}
int concurrent_map_put(struct Concurrent_Map *map, void *key, int value)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
struct Bucket *bucket = &(map->buckets[bucket_index]);
int j;
do
{
pthread_mutex_t mutex = bucket->mutex;
int j;
pthread_mutex_lock(&mutex);
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
pthread_mutex_unlock(&mutex);
return 0;
}
else if (bucket->keys[j] == NULL)
{
bucket->vals[j] = value;
bucket->keys[j] = key;
pthread_mutex_unlock(&mutex);
return 1;
}
}
if (bucket->next == NULL)
{
// allocate a new bucket
struct Bucket *new_bucket = malloc(sizeof(struct Bucket));
if (pthread_mutex_init(&(new_bucket->mutex), NULL) == 0)
{
void **key_alloc = malloc(sizeof(void *) * (ENTRIES_PER_BUCKET));
if (key_alloc != NULL)
{
new_bucket->keys = key_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->keys[k] = NULL;
}
}
int *vals_alloc = malloc(sizeof(int) * (ENTRIES_PER_BUCKET));
if (vals_alloc != NULL)
{
new_bucket->vals = vals_alloc;
int k;
for (k = 0; k < ENTRIES_PER_BUCKET; k++)
{
new_bucket->vals[k] = -1;
}
}
bucket->next = new_bucket;
}
}
pthread_mutex_unlock(&mutex);
bucket = bucket->next;
} while (1);
return 0;
}
int concurrent_map_erase(struct Concurrent_Map *map, void *key, void **trash)
{
map_key_hash *khash = map->khash;
unsigned hash = khash(key);
unsigned start = loop(hash, map->capacity);
unsigned bucket_index = loop(start + 0, map->capacity);
struct Bucket *bucket = &(map->buckets[bucket_index]);
int j;
do
{
pthread_mutex_t mutex = bucket->mutex;
int j;
pthread_mutex_lock(&mutex);
for (j = 0; j < ENTRIES_PER_BUCKET; j++)
{
if (map->keys_eq(bucket->keys[j], key))
{
bucket->vals[j] = -1;
bucket->keys[j] = NULL;
pthread_mutex_unlock(&mutex);
return 1;
}
}
pthread_mutex_unlock(&mutex);
if (bucket->next != NULL)
{
bucket = (bucket->next);
}
else
{
break;
}
} while (1);
return 0;
}
int concurrent_map_size(struct Concurrent_Map *map)
{
int num_buckets = 0;
struct Bucket *buckets = map->buckets;
unsigned i;
for (i = 0; i < map->capacity; i++)
{
struct Bucket bucket = buckets[i];
do
{
num_buckets++;
if (bucket.next != NULL)
{
bucket = *(bucket.next);
}
else
{
break;
}
} while (1);
}
return num_buckets * ENTRIES_PER_BUCKET;
}
struct FlowId
{
int src_port;
int dst_port;
int src_ip;
int dst_ip;
int internal_device;
int protocol;
};
bool FlowId_eq(void *a, void *b)
{
if (a == NULL || b == NULL)
{
return false;
}
struct FlowId *id1 = a;
struct FlowId *id2 = b;
return (id1->src_port == id2->src_port) && (id1->dst_port == id2->dst_port) && (id1->src_ip == id2->src_ip) && (id1->dst_ip == id2->dst_ip) && (id1->internal_device == id2->internal_device) && (id1->protocol == id2->protocol);
}
unsigned FlowId_hash(void *obj)
{
struct FlowId *id = obj;
unsigned hash = 0;
hash = __builtin_ia32_crc32si(hash, id->src_port);
hash = __builtin_ia32_crc32si(hash, id->dst_port);
hash = __builtin_ia32_crc32si(hash, id->src_ip);
hash = __builtin_ia32_crc32si(hash, id->dst_ip);
hash = __builtin_ia32_crc32si(hash, id->internal_device);
hash = __builtin_ia32_crc32si(hash, id->protocol);
return hash;
}
struct Concurrent_Map *concurrent_map;
#define NUM_THREADS 2
#define NUM_PACKETS 10000000
void *expirator(void *arg)
{
// printf("Thread started executing\n");
unsigned i = 0;
int error = 0;
unsigned packet_count = NUM_PACKETS / NUM_THREADS;
while (i < packet_count)
{
i++;
struct FlowId *id = malloc(sizeof(struct FlowId));
struct FlowId *id1 = malloc(sizeof(struct FlowId));
id->dst_ip = 1;
id->src_ip = 1;
id->internal_device = 1;
id->protocol = 1;
id->src_port = 1;
id->dst_port = rand() % 65536;
id1->dst_ip = 1;
id1->src_ip = 1;
id1->internal_device = 1;
id1->protocol = 1;
id1->src_port = 1;
id1->dst_port = rand() % 65536;
int external_port = rand() % 65536;
int external;
concurrent_map_erase(concurrent_map, id, NULL);
concurrent_map_put(concurrent_map, id, external_port);
concurrent_map_get(concurrent_map, id, &external);
if (external_port != external)
{
error++;
}
else
{
}
}
return NULL;
}
int main()
{
clock_t begin = clock();
concurrent_map_allocate(FlowId_eq, FlowId_hash, 65536, &(concurrent_map));
pthread_t *threads = malloc(sizeof(pthread_t) * NUM_THREADS);
int i;
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_create(&threads[i], NULL, expirator, NULL) != 0)
{
printf("Error creating threads");
exit(0);
}
}
for (i = 0; i < NUM_THREADS; i++)
{
if (pthread_join(threads[i], NULL) != 0)
{
printf("Error joining threads");
exit(0);
}
}
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("%lf\n", time_spent);
return 0;
}
Вот как запустить эту программу.
gcc concurrent_map.c -o test-concurrent-new -lpthread -msse4.2 -O3
Затем я измеряю время выполнения для фиксированной рабочей нагрузки, и следующие значения времени я наблюдал.
1: 3.29
2: 6.687811
3: 5.88
4: 6,23
5: 6,38
6: 6,52
7: 6,74
8: 6,82
Кажется, что когда количество потоков увеличивается, время выполнения увеличивается и остается почти таким же.
Я профилировал этот код с помощью Mutrace, который ищет конфликт мьютексов. Оказывается,
Нет мьютекса в соответствии с параметрами фильтрации.
Я проверил количество пропусков кеша, и оказалось, что количество пропусков кеша примерно равнопри изменении количества потоков.
Почему время выполнения не уменьшается при увеличении количества потоков?
Я запускаю это на 32-ядерном компьютере