Как ускорить мьютекс C? - PullRequest
       87

Как ускорить мьютекс C?

0 голосов
/ 12 апреля 2020

У меня есть этот неправильный код.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX 1000

struct TContext {
    const char* Name;
    int* Counter;
    int Mod;
};

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
        }
    }
    pthread_exit(0);
}

int main()
{
    pthread_t t1;
    pthread_t t2;

    int counter = 0;
    struct TContext ctxt1 = {"even", &counter, 0};
    struct TContext ctxt2 = {"odd", &counter, 1};
    pthread_create(&t1, 0, ThreadFunc, &ctxt1);
    pthread_create(&t2, 0, ThreadFunc, &ctxt2);

    pthread_join(t1, 0);
    pthread_join(t2, 0);
    printf("\n");
    return 0;
}

Моя цель - синхронизировать его и получить последовательность 0, 1, 2, 3, 4, 5 ....

Я пытаюсь заблокировать и разблокировать мьютекс таким образом

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* ThreadFunc(void* arg) {
  struct TContext* ctxt = arg;
  int* counter = ctxt->Counter;
  fprintf(stderr, "This is %s thread\n", ctxt->Name);
  while (*counter < MAX) {
    if (*counter % 2 == ctxt->Mod) {
      pthread_mutex_lock(&mutex);
      printf("%d ", (*counter)++);
      pthread_mutex_unlock(&mutex);
    }
  }
  pthread_exit(0);
}

Но он работает очень медленно (у меня есть tl за одну секунду).

Как я могу синхронизировать этот код более эффективным способом? Или, может быть, я могу оптимизировать C -mutex?

Ответы [ 3 ]

1 голос
/ 12 апреля 2020

Немного более традиционный способ, чем Крис Холлс :

pthread_cond_t  cv;
pthread_mutex_t lock;

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    pthread_mutex_lock(&lock);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
            pthread_cond_broadcast(&cv);
        } else {
            pthread_cond_wait(&cv, &lock);
        }
    }
    pthread_mutex_unlock(&lock);
    pthread_exit(0);
}

и в основном:

pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);

где-то до создания потоков. Это также позволяет вам добавлять произвольное количество четных + нечетных потоков без помех (хотя без ускорения, просто интеллектуальное любопытство).

1 голос
/ 12 апреля 2020

Я предлагаю:

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    volatile int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);

    while (1)
    {
        int count ;

        count = *counter ;     // NB: volatile*

        if (count >= MAX)
          break ;

         if ((count % 2) == ctxt->Mod)
         {
             printf("%d ", count) ;
             *counter = count + 1 ;
         } ;
    } ;
    pthread_exit(0);
}

Что, по крайней мере для x86 / x86_64, даст тот эффект, который, я думаю, вы искали, а именно, что два потока по очереди увеличивают счетчик.

Действительно интересный вопрос: почему это работает: -)


Постскриптум

Приведенный выше код критически зависит от четырех вещей:

  1. между потоками используется только одно значение - счетчик,

  2. счетчик одновременно управляет данными и - ls бит счетчика сигнализирует, какой поток должен продолжаться.

  3. чтение и запись счетчика должны быть атомами c - поэтому каждое чтение счетчика считывает последнее записанное значение (а не некоторые комбинация предыдущей и текущей записи).

  4. компилятор должен выдать код для фактического чтения / записи счетчика из / в память внутри l oop.

Теперь (1) и (2) определяют c для этой конкретной проблемы. (3) обычно верно для int (хотя может потребоваться правильное выравнивание). (4) достигается путем определения счетчика как volatile.

Итак, я изначально сказал, что это будет работать "по крайней мере для x86 / x86_64", потому что я знаю (3) верно для этих устройств, но Я также считаю, что это справедливо для многих (большинства?) Распространенных устройств.

Более чистая реализация будет определять счетчик _Atomic следующим образом:

#include <stdatomic.h>

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    atomic_int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);

    while (1)
    {
        int count ;

        count = atomic_load_explicit(counter, memory_order_relaxed) ;

        if (count > MAX)        // printing up to and including MAX
          break ;

         if ((count % 2) == ctxt->Mod)
         {
             printf("%d ", count) ;
             atomic_store_explicit(counter, count + 1, memory_order_relaxed) ;
         } ;
    } ;
    pthread_exit(0);
}

Что делает (3) и (4) явный. Но обратите внимание, что (1) и (2) по-прежнему означают, что нам не нужно упорядочивать память. Каждый раз, когда каждый поток читает счетчик, бит0 сообщает ему, «владеет» ли он счетчиком. Если он не владеет счетчиком, поток зацикливается, чтобы прочитать его снова. Если он владеет счетчиком, он использует значение, а затем записывает новое значение - и поскольку он передает «владение», он возвращается к чтению l oop (он не может ничего делать со счетчиком, пока не «владеет» им снова). После того, как MAX + 1 будет записано в счетчик, ни один из потоков не будет его использовать или изменять, так что это тоже безопасно.

Брат Занятый русский правильный, здесь есть «гонка данных», но это разрешается зависимостью данных, особенно в этом случае.


В целом

Приведенный выше код не очень полезен, если у вас нет других приложений с одним общим значением. Но это можно обобщить, используя операции memory_order_acquire и memory_order_acquire atomi c.

Предположим, у нас есть некоторый struct shared, который содержит некоторое (нетривиальное) количество данных, которые будет генерировать один поток, и другой будет потреблять. Предположим, мы снова используем atomic_uint counter (изначально ноль) для управления доступом к данному struct shared parcel. Теперь у нас есть поток производителя, который:

void* ThreadProducerFunc(void* arg) 
{
    atomic_uint counter = &count ;     // somehow
    ....
    while (1)
    {
        uint count ;

        do
          count = atomic_load_explicit(counter, memory_order_acquire) ;
        while ((count & 1) == 1) ;

        ... fill the struct shared parcel, somehow ...

        atomic_store_explicit(counter, count + 1, memory_order_release) ;
    } ;
    ....
}

и поток потребителя, который:

void* ThreadConsumerFunc(void* arg) 
{
    atomic_uint counter = &count ;     // somehow
    ....
    while (1)
    {
        uint count ;

        do
          count = atomic_load_explicit(counter, memory_order_acquire) ;
        while ((count & 1) == 0) ;

        ... empty the struct shared parcel, somehow ...

        atomic_store_explicit(counter, count + 1, memory_order_release) ;
    } ;
    ....
}

Операции получения нагрузки синхронизируются с операциями освобождения хранилища, поэтому:

  • в производителе: заполнение участка не начнется до тех пор, пока производитель не получит «владение» (как указано выше), а затем «завершится» (записи станут видимыми для другого потока) до счет обновляется (и новое значение становится видимым для другого потока).

  • у потребителя: опорожнение посылки не начнется, пока потребитель не получит «владение» (как указано выше) и затем «завершится» (все чтения будут считаны из памяти) до счетчик обновляется (и новое значение становится видимым для другого потока).

Очевидно, что два потока заняты, ожидая друг друга. Но с двумя или более посылками и счетчиками потоки могут развиваться со скоростью медленнее.


Наконец - x86 / x86_64 и получение / выпуск

С x86 / x86_64, все чтения и записи в память - это неявное чтение-чтение и запись-отпускание. Это означает, что накладные расходы равны нулю в atomic_load_explicit(..., memory_order_acquire) и atomic_store_explicit(..., memory_order_release).

И наоборот, все операции чтения-изменения-записи (и операции memory_order_seq_cst) несут накладные расходы в течение нескольких десятков часов - 30 ?, 50 ?, больше, если операция разрешена (зависит от устройства).

Итак, где производительность имеет решающее значение, возможно, стоит понять, что возможно (а что нет).

0 голосов
/ 12 апреля 2020

Как я могу синхронизировать этот код более эффективным способом?

Вы не можете: код принципиально неэффективен.

проблема в том, что объем выполняемой вами работы (увеличение целого числа) ничтожен по сравнению с накладными расходами синхронизации, поэтому последний доминирует.

Чтобы решить проблему, вам нужно выполнять больше работы для каждой блокировки / разблокировки pair.

В реальной программе каждый поток должен выполнять 1000 или 10000 «рабочих элементов» для каждой итерации блокировки / разблокировки. Что-то вроде:

lock;
const int start = *ctx->Counter;
*ctx->Counter += N;
unlock;

for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;

Но ваша игрушечная программа не поддается этому.

Или, может быть, я могу оптимизировать C -mutex?

Я предлагаю сначала попробовать правильный мьютекс. Вы быстро обнаружите, что это далеко не тривиально.

...