Как добиться безблокировочного, но блокирующего поведения? - PullRequest
15 голосов
/ 22 мая 2011

Я внедряю одиночную очередь без единого производителя без блокировки для интенсивного сетевого приложения.У меня есть куча рабочих потоков, получающих работу в своих отдельных очередях, которые они затем снимают и обрабатывают.

Удаление блокировок из этих очередей значительно улучшило производительность при высокой нагрузке, , но они больше не работаютблокировать, когда очереди пустые , что, в свою очередь, приводит к стремительному увеличению загрузки ЦП.

Как эффективно заставить поток блокироваться до тех пор, пока он не сможет успешно удалить что-либо из очереди или не будет уничтожен / прерван?

Ответы [ 6 ]

16 голосов
/ 22 мая 2011

Если вы работаете в Linux, используйте Futex .Он обеспечивает производительность реализации без блокировки, используя атомарные операции, а не вызовы ядра, как мьютекс, но если вам нужно установить процесс в режим ожидания из-за того, что какое-то условие не выполняется (т. Е. Блокировка-конфликт), он будетзатем сделайте соответствующие вызовы ядра, чтобы перевести процесс в спящий режим и вернуть его в рабочее состояние в будущем.По сути, это очень быстрый семафор.

9 голосов
/ 22 мая 2011

В Linux futex может использоваться для блокировки потока. Но имейте в виду, что Futexes являются хитрыми !

ОБНОВЛЕНИЕ: условные переменные гораздо безопаснее использовать, чем фьютексы, и они более переносимы. Однако условная переменная используется в сочетании с мьютексом, поэтому, строго говоря, результат больше не будет свободным от блокировки. Однако, если вашей основной целью является производительность (а не гарантия глобального прогресса), а заблокированная часть (то есть условие проверки после пробуждения потока) мала, может случиться так, что вы получите удовлетворительные результаты без необходимости углубляться в тонкости интеграции фьютексов в алгоритм.

7 голосов
/ 23 мая 2011

Если вы работаете в Windows, вы не сможете использовать фьютексы, но в Windows Vista есть аналогичный механизм, называемый Keyed Events . К сожалению, это не является частью опубликованного API (это собственный API NTDLL), но вы можете использовать его, если вы принимаете предупреждение, которое может измениться в будущих версиях Windows (и вам не нужно работать на ядра до Vista). Обязательно прочитайте статью, на которую я ссылался выше. Вот непроверенный набросок того, как это может работать:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

Вы также можете использовать аналогичный протокол, используя Slim Read Write блокировки и Условные переменные , с быстрым путем без блокировки. Это обертки для событий с ключами, поэтому они могут потребовать больше ресурсов, чем непосредственное использование событий с ключами.

1 голос
/ 16 июня 2011

Вы можете заставить поток спать, используя функцию sigwait (). Вы можете разбудить поток с помощью pthread_kill. Это намного быстрее, чем условные переменные.

1 голос
/ 22 мая 2011

Вы пробовали условное ожидание? Когда очередь опустеет, просто начните ждать нового задания. Поток, помещающий задания в очередь, должен запустить сигнал. Таким образом, вы используете блокировки только тогда, когда очередь пуста.

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

0 голосов
/ 22 мая 2011

Вы можете добавить сна, пока он ждет. Просто выберите самое большое ожидание, которое вы готовы иметь, затем сделайте что-то вроде этого (псевдокод, потому что я не помню синтаксис pthread):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

Даже что-то короткое, например, 100 мс сна, должно значительно снизить нагрузку на процессор. Я не уверен, в какой момент переключение контекста сделает его хуже, чем занятое ожидание.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...