Оптимизация общего буфера в многопоточной среде производителя / потребителя - PullRequest
3 голосов
/ 05 мая 2010

У меня есть проект, в котором у меня есть один поток производителя, который записывает события в буфер, и дополнительный единственный потребительский поток, который принимает события из буфера. Моя цель - оптимизировать эту вещь для одноядерной машины для достижения максимальной производительности.

В настоящее время я использую простой кольцевой буфер без блокировки (блокировка возможна, поскольку у меня только один потребительский поток и один поток производителя, и поэтому указатели обновляются только одним потоком).

#define BUF_SIZE 32768

struct buf_t { volatile int writepos; volatile void * buffer[BUF_SIZE]; 
    volatile int readpos;) };

void produce (buf_t *b, void * e) {
    int next = (b->writepos+1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[b->writepos] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    while (b->readpos == b->writepos); // nothing to consume. wait
    int next = (b->readpos+1) % BUF_SIZE;
    void * res = b->buffer[b->readpos]; b->readpos = next;
    return res;
}

buf_t *alloc () {
    buf_t *b = (buf_t *)malloc(sizeof(buf_t));
    b->writepos = 0; b->readpos = 0; return b;
}

Однако эта реализация еще не достаточно быстра и должна быть оптимизирована в дальнейшем. Я пробовал с различными значениями BUF_SIZE и получил некоторое ускорение. Кроме того, я переместился на writepos до buffer и readpos после buffer, чтобы обе переменные были в разных строках кэша, что также привело к некоторой скорости.

Что мне нужно, так это ускорение примерно на 400%. Есть ли у вас какие-либо идеи, как я мог бы добиться этого, используя такие вещи, как прокладка и т. Д.?

Ответы [ 3 ]

3 голосов
/ 05 мая 2010

Вот одна оптимизация, которую я вижу: в consume() вам не нужно постоянно извлекать b->readpos, поскольку поток, вызывающий consume(), единственный, кто может обновить его в любом случае. Поскольку это volatile, компилятор не может оптимизировать все эти извлечения, поэтому вам нужно сделать это явно:

void * consume (buf_t *b) {
    int rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    int next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[rp]; b->readpos = next;
    return res;
}

Вы также должны шагать по своему буферу с шагом не менее одной строки кэша, в противном случае вы получите пинг-понг кеш-линий между двумя процессорами (поскольку один процессор хочет, чтобы строка кэша читала b->buffer[n] и 15 раз из 16 другой делает недействительным запись b->buffer[n+1]). Например:

#define STRIDE 16
#define STEPS 2048
#define BUF_SIZE (STRIDE * STEPS)

#define TO_INDEX(n) (STRIDE * (((n) + 1) % STEPS) + (((n) + 1) / STEPS))

void produce (buf_t *b, void * e) {
    unsigned wp = b->writepos;
    unsigned next = (wp + 1) % BUF_SIZE;
    while (b->readpos == next); // queue is full. wait
    b->buffer[TO_INDEX(wp)] = e; b->writepos = next;
}

void * consume (buf_t *b) {
    unsigned rp = b->readpos;
    while (rp == b->writepos); // nothing to consume. wait
    unsigned next = (rp + 1) % BUF_SIZE;
    void * res = b->buffer[TO_INDEX(rp)]; b->readpos = next;
    return res;
}

В любом случае, стоит попробовать. (Обратите внимание, что пока STRIDE и STEPS являются степенями 2, пугающее деление и модуль в TO_INDEX() могут быть оптимизированы для сдвига и побитового и, но только если операнды unsigned - поэтому я предлагаю изменить эти типы соответственно).

1 голос
/ 05 мая 2010

Я предполагаю, что вы используете машину с более чем одним процессором или ядром. Если нет, то ваши занятые ожидания навредят. Они могут повредить в любом случае, если вы работаете в операционной системе, которая решает, что вы недостаточно спите, снижает динамический приоритет и работают другие программы.

Вам необходимо собрать данные о том, насколько заполнен ваш буфер. В какой-то момент слишком большой начинает повреждать ваш кеш тоже.

Если вы используете глобальный массив, а не выделяете его из кучи, тогда указатель на буфер становится литералом-указателем, и обоим потокам не нужно будет считывать это значение указателя из одного и того же места в кеше, поскольку он будет просто перемещен в код.

Если для вас важна пропускная способность (за счет задержки), а кеш действительно играет большую роль, то вы можете разрешить потребителю отставать от производителя, чтобы он не пытался читать и писать из одного и того же источника. поместить в буфер.

Возможно, вы захотите изменить интерфейс для вашей функции потребителя, чтобы он мог потреблять куски размера кеша (или кратно размеру) (это хорошо, если потребитель отстает от предложения производителя, которое я сделал выше) в дополнение к индивидуальному или частичному Кэш строки. Попробуйте сохранить выравнивание кеша потребления. Если вы думаете о доступных данных как о змее, то голова и хвост могут быть выровнены. Вы должны использовать не выровненный хвост, когда нет других данных для потребления. Если вы можете использовать любые другие данные в вызове для потребления, то вам просто нужно оставить хвост для следующего вызова.

Кроме этого и того, что было упомянуто caf, я должен был бы заподозрить, что все, что происходит вне этого кода, должно играть большую роль.

0 голосов
/ 06 мая 2010

Я реализовал оптимизацию в первом блоке кода ответа кафе. Они на самом деле немного ускорили (спасибо), но этого еще недостаточно. Вторая оптимизация, которая приводит к заполнению кэша столбцом, а не строкой, приводит к снижению производительности.

Идея отставания потребителя от производителя не дала никакого ускорения.

Теперь я нахожусь на 300%.

Дополнительным изменением, которое я сделал, был некоторый хак относительно изменчивых переменных writepos и readpos:

void produce (void * e) {
    unsigned int oldpos = buffer.writepos;
    unsigned int next = (oldpos+1) % BUF_SIZE;
    while (next == buffer.rpos) { // rpos is not volatile
        buffer.rpos = buffer.readpos;
        usleep(1);
    }
    buffer.buffer[oldpos] = e; buffer.writepos = next;
}

и аналогичные для потребления ().

Дополнительные изменения в структуре приводят к следующей новой структуре буфера (в глобальной области, как было предложено в одном ответе вместо кучи).

#define STRIDE 16
#define STEPS 524288

struct buf_t {
    volatile unsigned int writepos;
    int offset [STRIDE - 1];
    unsigned int wpos;
    int offset2 [STRIDE - 1];
    volatile void * buffer[BUF_SIZE];
    int offset4 [STRIDE];
    volatile unsigned int readpos;
    int offset3 [STRIDE - 1];
    unsigned int rpos;
}

, который дал 300% -ое ускорение, которое отсутствовало, и подняло его ниже предела производительности, которого я должен был достигнуть.

Если у вас есть несколько дополнительных хаков, которые можно использовать для дальнейшего повышения производительности, не стесняйтесь также публиковать их: -)

Спасибо за помощь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...