Для начала, в последнем потоке вам нужно [после установки end
]:
if (i == (threadCount - 1)) arguments.end = fileSize;
, чтобы получить все байты в последнем сегменте, а не go за пределами EOF. Это необходимо всякий раз, когда размер файла не является точным кратным числу потоков.
Чтобы предотвратить утечку памяти для argStruct
, передаваемого потоку, в нижней части processFile
необходимо free(input)
Кроме того, потоки совместно используют определенные ресурсы (см. man pthreads
). Примечательно, что дескрипторы файлов открываются. Итак, вам понадобится мьютекс вокруг вашего доступа к textFile
. (например) pthread_mutex_lock(&text_mutex);
et. al.
И каждый поток должен сделать свой собственный fseek
для той части файла, к которой он пытается получить доступ.
И, size
имеет «коэффициент спада», равный 10. Это нормально для безопасности выделенной области, но вызывает слишком много данных для чтения. Лучше либо пропустить + 10
, либо сделать: int exact_size = end - start;
. Кроме того, обратите внимание, что вы делаете «отстой» при установке size
и дополнительной суммы в malloc
, так что это не требуется.
Также обратите внимание, что fread
не не гарантирует EOS-символ (0x00) в конце буфера, как fgets
. Итак, если вы собираетесь выполнять строковые операции с буфером, вам нужно будет применить это самостоятельно (а вам понадобится "отстойник" по крайней мере 1):
Итак, нам нужно:
pthread_mutex_lock(&text_mutex);
fseek(textFile,start,0);
fread(readFile,1,exact_size,textFile);
readFile[exact_size] = 0;
pthread_mutex_unlock(&text_mutex);
И помните, что main
должен инициализировать text_mutex
с pthread_mutex_init
перед выполнением pthread_create
.
Но .. .
Использование fread
может быть проблематичным c. Когда вы разбиваете файл на фрагменты длиной divide
, вы, вероятно, рубите файл таким образом, чтобы у первого потока была обрезана последняя строка, а следующий поток увидит оставшуюся часть этой строки, прежде чем увидит свою первую полная строка и т. д. ...
Возможно, лучше всего сделать mmap
для всего файла в main
, и он просканирует буфер в поисках новой строки [или пробела], и дайте каждому потоку сегмент, который гарантированно будет соответствовать новым строкам.
ОБНОВЛЕНИЕ:
Я закодировал версию, которая использует mmap
и исправляет небольшую ошибку [прошу прощения за бесполезную очистку стиля]. Он компилируется без ошибок, даже с -O2 -Wall
[который вы всегда должны использовать для перехвата всех предупреждений]. Я не проверял, но это должно помочь вам.
#include <time.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/mman.h>
char *filebuf;
typedef struct {
char word[50];
int count;
int totalWords;
} wordsStruct;
struct argStruct {
pthread_t threadid;
int start;
int end;
int count;
wordsStruct *words;
};
int
stringLength(char s[])
{
int c = 0;
while (s[c] != '\0')
c++;
return c;
}
void
groomString(char *line)
{
for (int i = 0; line[i] != '\0'; ++i) {
line[i] = tolower(line[i]);
while (!((line[i] >= 'a' && line[i] <= 'z') || line[i] == '\0')) {
for (int j = i; line[j] != '\0'; j++) {
line[j] = line[j + 1];
line[j] = '\0';
}
}
}
}
int
counter(int n)
{
static int test;
test = n;
// NOTE/BUG: this is the assignment operator and you want equality
#if 0
if (n = 0) {
#else
if (n == 0) {
#endif
return test;
}
else {
n++;
return n;
}
}
void
processFile(void *input)
{
struct argStruct *params = input;
wordsStruct *words = params->words;
int start = params->start;
int end = params->end;
#if 0
int count = params->count;
#endif
int size = (end - start) + 10;
char delim[] = " \t\v\n\r";
char *copy = malloc(size + 1);
memcpy(copy,&filebuf[start],size);
copy[size] = 0;
char *saveptr;
int inArray;
int length;
static int added;
char *token = strtok_r(copy, delim, &saveptr);
while (token) {
groomString(token);
length = stringLength(token);
if (length > 5) {
inArray = 0;
for (int i = 0; i < added; i++) {
if (strcmp(words[i].word, token) == 0) {
inArray = i;
}
}
if (inArray == 0) {
added++;
strcpy(words[added].word, token);
words[added].count = 1;
}
else {
words[inArray].count++;
}
}
token = strtok_r(NULL, delim, &saveptr);
}
words->totalWords = added;
free(copy);
free(token);
}
int
main(int argc, char *argv[])
{
int pfile;
int threadCount = 0,
fileSize = 0,
divide = 0;
struct stat st;
off_t curpos;
wordsStruct *allWords = (wordsStruct *) malloc(sizeof(wordsStruct));
if (argc > 2) {
pfile = open(argv[1],O_RDONLY);
if (pfile < 0) {
perror("FILE OPEN FAILURE");
}
threadCount = atoi(argv[2]);
struct argStruct *threads =
malloc(sizeof(struct argStruct) * threadCount);
struct argStruct *arg;
fstat(pfile,&st);
fileSize = st.st_size;
filebuf = mmap(NULL,fileSize,PROT_READ,MAP_PRIVATE,pfile,0);
divide = (fileSize / threadCount);
#if 0
int j = 0;
#endif
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
arg->words = allWords;
if (i == 0)
arg->start = 0;
else
arg->start = arg[-1].end;
curpos = arg->start + divide;
for (; curpos < fileSize; ++curpos) {
if (filebuf[curpos] == '\n') {
++curpos;
break;
}
}
if (curpos > fileSize)
curpos = fileSize;
arg->end = curpos;
arg->count = i;
}
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
pthread_create(&arg->threadid, NULL, (void *) processFile, arg);
}
for (int i = 0; i < threadCount + 1; i++) {
arg = &threads[i];
pthread_join(arg->threadid, NULL);
}
munmap(filebuf,fileSize);
close(pfile);
free(threads);
}
else {
printf("Please enter text file name and number of threads");
}
return 0;
}
ОБНОВЛЕНИЕ № 2:
Упс, я пропустил несколько вещей. ..
Поскольку added
определяется с static
, тогда все потоки пытаются использовать его. Они мчатся. Это было бы "плохо". И для списка params->words
индексы будут неправильными .
Для доступа к нему потребуется либо пара блокировки / разблокировки мьютекса в верхней / нижней части потока l oop или использование примитивов atomi c (например, stdatomic.h
).
Но пары мьютексов будут иметь тот эффект, что какой бы поток ни получил первый мьютекс, он "монополизирует" его, и все потоки будут работать, более или менее последовательно. Таким образом, победив цель иметь несколько потоков.
Итак, во-первых, мы хотим удалить static
, чтобы у каждого потока была своя собственная копия.
Но теперь получается, что words
(то есть params->words
) не"растет", когда мы добавляем новое слово в список. Таким образом, мы должны увеличивать размер списка всякий раз, когда добавляем новое слово, поэтому нам нужно добавить вызов realloc
.
Делать это в одном общем списке (например, где вы выделяете allWords
в main
) проблематично c. Из-за поиска и realloc
«критическая секция» кода, который должен быть защищен мьютексом, представляет собой почти все тело l oop.
Итак, одно решение состоит в том, чтобы каждый поток поддерживает свой собственный список для каждого потока (т. е. params->words
отличается для каждого потока). Затем потоки не участвуют в гонке и не требуют блокировки мьютекса во время работы.
Однако это будет означать наличие дубликатов между потоками.
Итак, после main
делает pthread_join
во всех потоках, main
должен воссоздать единый унифицированный список, который исключает дубликаты.
Использование массивов, это более громоздко. Связанный список может упростить рекомбинацию различных списков.
Самый простой способ - скопировать все записи из каждого списка для каждого потока, добавив его в большой список.
Затем отсортируйте это list.
Затем создайте новый список, который устраняет дубликаты.
Вот обновленная версия, которая устраняет эти проблемы [снова, не проверено]:
#include <time.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/mman.h>
char *filebuf;
typedef struct {
char word[50];
int count;
#if 0
int totalWords;
#endif
} wordsStruct;
struct argStruct {
pthread_t threadid;
int start;
int end;
int count;
int totalWords;
wordsStruct *words;
};
int
stringLength(char s[])
{
int c = 0;
while (s[c] != '\0')
c++;
return c;
}
void
groomString(char *line)
{
for (int i = 0; line[i] != '\0'; ++i) {
line[i] = tolower(line[i]);
while (!((line[i] >= 'a' && line[i] <= 'z') || line[i] == '\0')) {
for (int j = i; line[j] != '\0'; j++) {
line[j] = line[j + 1];
line[j] = '\0';
}
}
}
}
int
counter(int n)
{
static int test;
test = n;
// NOTE/BUG: this is the assignment operator and you want equality
#if 0
if (n = 0) {
#else
if (n == 0) {
#endif
return test;
}
else {
n++;
return n;
}
}
void *
processFile(void *input)
{
struct argStruct *params = input;
int start = params->start;
int end = params->end;
int size = (end - start) + 10;
char delim[] = " \t\v\n\r";
char *copy = malloc(size + 1);
memcpy(copy,&filebuf[start],size);
copy[size] = 0;
char *saveptr;
int inArray;
int length;
char *token = strtok_r(copy, delim, &saveptr);
int added = 0;
params->words = NULL;
params->count = 0;
while (token) {
groomString(token);
length = stringLength(token);
if (length > 5) {
wordsStruct *words = params->words;
// try to find an existing word struct for the current token
inArray = 0;
for (int i = 0; i < added; i++) {
if (strcmp(words[i].word, token) == 0) {
inArray = i;
break;
}
}
// found a token that is already in the words list -- just increment
// the count
if (inArray != 0) {
++words[inArray].count;
continue;
}
// add a new word struct to the list
++added;
words = realloc(words,sizeof(wordsStruct) * added);
params->words = words;
// fill it in and initialize its count
words += added;
strcpy(words->word, token);
words->count = 1;
}
token = strtok_r(NULL, delim, &saveptr);
}
params->totalWords = added;
free(copy);
free(token);
return (void *) 0;
}
int
wordcmp(const void *a,const void *b)
{
const wordsStruct *wa = a;
const wordsStruct *wb = b;
int cmpflg = strcmp(wa->word,wb->word);
return cmpflg;
}
int
main(int argc, char *argv[])
{
int pfile;
int threadCount = 0,
fileSize = 0,
divide = 0;
struct stat st;
off_t curpos;
#if 0
wordsStruct *allWords = malloc(sizeof(wordsStruct));
#endif
if (argc <= 2) {
printf("Please enter text file name and number of threads");
return 1;
}
pfile = open(argv[1],O_RDONLY);
if (pfile < 0) {
perror("FILE OPEN FAILURE");
}
threadCount = atoi(argv[2]);
struct argStruct *threads =
malloc(sizeof(struct argStruct) * threadCount);
struct argStruct *arg;
fstat(pfile,&st);
fileSize = st.st_size;
filebuf = mmap(NULL,fileSize,PROT_READ,MAP_PRIVATE,pfile,0);
divide = (fileSize / threadCount);
#if 0
int j = 0;
#endif
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
#if 0
arg->words = allWords;
#endif
if (i == 0)
arg->start = 0;
else
arg->start = arg[-1].end;
curpos = arg->start + divide;
for (; curpos < fileSize; ++curpos) {
if (filebuf[curpos] == '\n') {
++curpos;
break;
}
}
if (curpos > fileSize)
curpos = fileSize;
arg->end = curpos;
arg->count = i;
}
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
pthread_create(&arg->threadid, NULL, (void *) processFile, arg);
}
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
pthread_join(arg->threadid, NULL);
}
munmap(filebuf,fileSize);
close(pfile);
// get total number of words in all lists
int totalcnt = 0;
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
totalcnt += arg->totalWords;
}
// create a unified list [that may have duplicates]
wordsStruct *biglist = malloc(sizeof(wordsStruct) * totalcnt);
int bigidx = 0;
for (int i = 0; i < threadCount; i++) {
arg = &threads[i];
for (int wordidx = 0; wordidx < arg->totalWords; ++wordidx)
biglist[bigidx++] = arg->words[wordidx];
free(arg->words);
}
free(threads);
// sort the list
qsort(biglist,totalcnt,sizeof(wordsStruct),wordcmp);
// remove duplicates
int cleancnt = 0;
wordsStruct *cleanlist = malloc(sizeof(wordsStruct) * totalcnt);
if (totalcnt > 0)
cleanlist[cleancnt++] = biglist[0];
for (int bigidx = 1; bigidx < totalcnt; ++bigidx) {
if (strcmp(cleanlist[cleancnt - 1].word,biglist[bigidx].word) == 0)
continue;
cleanlist[cleancnt++] = biglist[bigidx];
}
free(biglist);
// trim the list
cleanlist = realloc(cleanlist,sizeof(wordsStruct) * cleancnt);
return 0;
}