while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}
Приведенный выше пример не является MRE, но он чрезвычайно прост:
У меня есть основной поток с al oop, который постоянно получает сообщения из очереди сообщений. После получения нового сообщения оно сохраняется в локальном буфере message_buffer
. Затем создается новый поток, чтобы «позаботиться» о новом сообщении, и, таким образом, адрес буфера сообщений передается в handle_message
, который впоследствии выполняет новый поток.
Проблема
Часто 2 потока будут печатать одно и то же сообщение, хотя я могу со 100% уверенностью убедиться, что сообщения в очереди не были одинаковыми.
Я не совсем уверен, но мне кажется, я понимаю, почему это происходит:
говорит, что я посылаю sh 2 разных сообщения в mqueue, и только тогда я начинаю их потреблять.
На первой итерации while
l oop сообщение будет получено из очереди и сохранено в message_buffer
. Будет создан новый поток, и ему будет передан адрес message_length
. Но этот поток может быть недостаточно быстрым, чтобы вывести содержимое буфера в поток до того, как будет использовано следующее сообщение (на следующей итерации l oop), а затем содержимое message_buffer
будет переопределено , Таким образом, первый и второй поток теперь печатают одно и то же значение.
Мой вопрос: каков наиболее эффективный способ решить эту проблему? Я довольно плохо знаком с параллельным программированием и многопоточностью / pthreads, и меня немного ошеломляют различные примитивы синхронизации.
Проблемы с мьютексом
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}
Я не думаю, что моя текущая реализация мьютекса верно, поскольку потоки все еще получают повторяющиеся сообщения. Основной поток может заблокировать мьютекс, получить сообщение в буфер, разблокировать мьютекс, порождать поток, но этот поток все еще может зависать, и основной может просто переписать буфер снова (так как новый мьютекс буфера никогда не блокировался новым что делает мою текущую реализацию мьютекса бесполезной? Как мне это преодолеть?