Так вот что я пытаюсь достичь.У меня есть список рабочих элементов, которые необходимо передать узлу 0 на узлы 1, 2 и 3 посредством передачи сообщений MPI.
Для узлов 1, 2 и 3 в каждом узле есть 2 потока считывателей, которые будут непрерывноping узел 0 для работы и получит работу от узла 0 соответственно.
Прямо сейчас, если у меня только 1 поток в узле 0, то есть узел 0 будет только слушать запросы от других узлов через MPI_Recv и отправлять работу через MPI_Send, тогда все работает нормально.
Однако, если я также хочу, чтобы на узле 0 было 2 потока считывателей, которые также извлекали работы из списка (эти 2 потока являются дополнением к основному потоку, который обрабатывает распределение работы, описанное выше), как мне это сделать?
Я пытался использовать #pragma omp параллельно и позволить главному потоку (omp_get_thread_num == 0) обрабатывать распределение работы, в то время как другие 2 потока используют #pragma omp критический для доступа к списку, чтобы получить работу, но это приводит к ошибке завершения INTEL, когда я пытался его запустить.
Спасибо!
if (pid == 0) {vector list_file;// список имен файлов для распространения
// pre-populate list_file with special file name "LIST_COMPLETED" so that when a node receives this file name it knows that there is no more file to be requested from node 0
for (int i = 0; i < (numP)*num_readers; i++)
{
list_file.push_back(request_termination_message);
}
// add the file names to list_file
GetInputFile(argv, list_file); // argv[1] is a text file that contains the input text files to be processed, all the text file names will be added to list_file
int num_file_remaining = list_file.size(); // number of file remained in the queue to be processed
#pragma omp parallel num_threads(num_readers + 1)
{
if (omp_get_thread_num == 0)
{
MPI_Status request_stats; // for MPI_recv
// listen to request for file from other nodes as long as there is file left in list_file
while (num_file_remaining > 0)
{
MPI_Recv(NULL, 0, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &request_stats); // listen to request for file
MPI_Send(list_file.back(), 5 * MAX_WORD_LENGTH, MPI_CHAR, request_stats.MPI_SOURCE, request_stats.MPI_SOURCE, MPI_COMM_WORLD); // send the file to respective node
list_file.pop_back(); // remove the file that was just sent
num_file_remaining -= 1; // reduce the number of file remained in the queue
}
}
else
{
char* file_name;
while (strcmp(file_name, request_termination_message) != 0)
{
#pragma omp critical
{
file_name = list_file.back();
list_file.pop_back();
num_file_remaining -= 1;
}
}
}
}
}