Как вы реализуете кольцевой буфер в C? - PullRequest
65 голосов
/ 06 мая 2009

Мне нужен кольцевой буфер фиксированного размера (выбираемый во время выполнения при его создании, а не во время компиляции), который может содержать объекты любого типа, и он должен быть очень высокой производительности , Я не думаю, что будут проблемы с ресурсами из-за того, что, несмотря на то, что это встроенная многозадачная среда, она совместная, поэтому сами задачи могут справиться с этим.

Моей первоначальной мыслью было сохранить простую структуру в буфере, которая бы содержала тип (simple enum / define) и пустой указатель на полезную нагрузку, но я хочу, чтобы это было как можно быстрее, поэтому я открыт для предложений которые включают обход кучи.

На самом деле я рад обойти любую стандартную библиотеку для необработанной скорости - из того, что я видел в коде, он не сильно оптимизирован для CPU: похоже, они просто скомпилировали код C для таких вещей, как strcpy() и так далее, сборка с ручным кодированием отсутствует.

Любой код или идеи будут с благодарностью. Требуемые операции:

  • создать буфер с определенным размером.
  • поставить на хвост.
  • получить от головы.
  • вернуть счет.
  • удалить буфер.

Ответы [ 7 ]

72 голосов
/ 06 мая 2009

Самое простое решение - отслеживать размер элемента и количество элементов, а затем создать буфер с соответствующим количеством байтов:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}
15 голосов
/ 20 ноября 2009
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}

До тех пор, пока длина вашего кольцевого буфера является степенью двойки, невероятно быстрая двоичная операция "&" обернется вокруг вашего индекса для вас. Для моего приложения я показываю пользователю сегмент аудио из кольцевого буфера аудио, полученного с микрофона.

Я всегда проверяю, что максимальное количество звука, которое может отображаться на экране, намного меньше размера кольцевого буфера. В противном случае вы можете читать и писать из одного и того же куска. Это, вероятно, даст вам странные артефакты отображения.

11 голосов
/ 15 декабря 2012

Во-первых, заголовок. Вам не нужна арифметика по модулю, чтобы обернуть буфер, если вы используете битовые целые, чтобы удерживать «указатели» head & tail, и размер их так, чтобы они были идеально синхронизированы. IE: 4096, вставленный в 12-битное целое число без знака, равно 0 само по себе, без каких-либо ограничений. Устранение арифметики по модулю, даже для степеней 2, удваивает скорость - почти точно.

10 миллионов итераций заполнения и опустошения буфера 4096 для элементов данных любого типа занимают 52 секунды на моем третьем поколении iPS Dell XPS 8500 третьего поколения с использованием компилятора Visual Studio 2010 C ++ со встроенной версией по умолчанию и 1/8192 из этого для обслуживания данных .

Я бы RX переписал тестовые циклы в main (), чтобы они больше не управляли потоком, который контролируется и должен контролироваться возвращаемыми значениями, указывающими, что буфер заполнен или пуст, и разрывом оператора; заявления. То есть: наполнитель и сушилка должны иметь возможность биться друг против друга без искажения или нестабильности. В какой-то момент я надеюсь многопоточность этого кода, после чего это поведение будет иметь решающее значение.

QUEUE_DESC (дескриптор очереди) и функция инициализации заставляют все буферы в этом коде иметь степень 2. Вышеупомянутая схема НЕ будет работать иначе. Что касается темы, обратите внимание, что QUEUE_DESC не жестко запрограммирован, он использует константу манифеста (#define BITS_ELE_KNT) для своей конструкции. (Я предполагаю, что степень 2 - это достаточная гибкость)

Чтобы сделать выбор размера буфера во время выполнения, я попробовал разные подходы (здесь не показаны) и остановился на использовании USHRT для Head, Tail, EleKnt, способных управлять буфером FIFO [USHRT]. Чтобы избежать арифметики по модулю, я создал маску для && с Head, Tail, но эта маска оказывается (EleKnt -1), поэтому просто используйте ее. Использование USHRTS вместо битовых битов увеличило производительность ~ 15% на тихой машине. Ядра процессоров Intel всегда были быстрее, чем их шины, поэтому на загруженной, совместно используемой машине упаковка ваших структур данных загружает вас и опережает другие конкурирующие потоки. Компромиссы.

Обратите внимание, что фактическое хранилище для буфера выделяется в куче с помощью calloc (), а указатель находится в основе структуры, поэтому структура и указатель имеют ТОЧНО одинаковый адрес. IE; для связывания регистров не требуется добавлять смещение к адресу структуры.

В том же духе все переменные, связанные с обслуживанием буфера, физически смежны с буфером, связаны в одну структуру, поэтому компилятор может создать прекрасный язык ассемблера. Вам придется убить встроенную оптимизацию, чтобы увидеть любую сборку, потому что в противном случае она будет заброшена.

Чтобы поддерживать полиморфизм любого типа данных, я использовал memcpy () вместо присваиваний. Если вам нужна гибкость для поддержки одного случайного типа переменной для каждой компиляции, тогда этот код работает отлично.

Для полиморфизма вам просто нужно знать тип и требования к его хранилищу. Массив дескрипторов DATA_DESC позволяет отслеживать каждый элемент данных, помещаемый в QUEUE_DESC.pBuffer, чтобы его можно было правильно извлечь. Я бы просто выделил достаточно памяти pBuffer для хранения всех элементов с самым большим типом данных, но следил за тем, сколько из этого хранилища фактически используется данным датумом в DATA_DESC.dBytes. Альтернатива - заново изобрести менеджер кучи.

Это означает, что UCHAR * pBuffer в QUEUE_DESC будет иметь параллельный сопутствующий массив для отслеживания типа данных и размера, тогда как место хранения данных в pBuffer останется таким же, как и сейчас. Новым членом будет что-то вроде DATA_DESC * pDataDesc или, возможно, DATA_DESC DataDesc [2 ^ BITS_ELE_KNT], если вы сможете найти способ превратить ваш компилятор в представление с такой прямой ссылкой. Calloc () всегда более гибок в этих ситуациях.

Вы бы все еще использовали memcpy () в Q_Put (), Q_Get, но количество фактически скопированных байтов будет определяться DATA_DESC.dBytes, а не QUEUE_DESC.EleBytes. Элементы могут быть всех разных типов / размеров для любого заданного пута или гета.

Я полагаю, что этот код удовлетворяет требованиям к скорости и размеру буфера и может быть выполнен с учетом требований для 6 различных типов данных. Я оставил множество тестовых приспособлений в форме операторов printf (), чтобы вы могли убедиться (или нет), что код работает правильно. Генератор случайных чисел демонстрирует, что код работает для любой случайной комбинации голова / хвост.

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}
9 голосов
/ 06 мая 2009

Можете ли вы перечислить типы, необходимые во время кодирования буфера, или вам нужно иметь возможность добавлять типы во время выполнения с помощью динамических вызовов? Если первое, то я бы создал буфер в виде массива из n структур, выделенных в куче, где каждая структура состоит из двух элементов: тега enum, идентифицирующего тип данных, и объединения всех типов данных. То, что вы теряете в плане дополнительного хранилища для небольших элементов, вы компенсируете тем, что вам не нужно иметь дело с выделением / освобождением и последующей фрагментацией памяти. Тогда вам просто нужно отслеживать начальный и конечный индексы, которые определяют элементы head и tail буфера, и обязательно вычислять mod n при увеличении / уменьшении индексов.

8 голосов
/ 27 декабря 2012

Вот простое решение на C. Предположим, что прерывания отключены для каждой функции. Никакого полиморфизма и прочего, просто здравый смысл.


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}
2 голосов
/ 13 октября 2015

C стиль, простой кольцевой буфер для целых чисел. Сначала используйте init, затем используйте put и get. Если буфер не содержит никаких данных, он возвращает ноль "0".

//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount   512
int     sRingBuf[cRingBufCount];    // Ring Buffer
int     sRingBufPut;                // Input index address
int     sRingBufGet;                // Output index address
Bool    sRingOverWrite;

void    GetRingBufCount(void)
{
int     r;
`       r= sRingBufPut - sRingBufGet;
        if ( r < cRingBufCount ) r+= cRingBufCount;
        return r; 
}

void    InitRingBuffer(void)
{
        sRingBufPut= 0;
        sRingBufGet= 0;
}       

void    PutRingBuffer(int d)
{
        sRingBuffer[sRingBufPut]= d;
        if (sRingBufPut==sRingBufGet)// both address are like ziro
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            sRingBufGet= IncRingBufferPointer(sRingBufGet);
        }
        else //Put over write a data
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            if (sRingBufPut==sRingBufGet)
            {
                sRingOverWrite= Ture;
                sRingBufGet= IncRingBufferPointer(sRingBufGet);
            }
        }
}

int     GetRingBuffer(void)
{
int     r;
        if (sRingBufGet==sRingBufPut) return 0;
        r= sRingBuf[sRingBufGet];
        sRingBufGet= IncRingBufferPointer(sRingBufGet);
        sRingOverWrite=False;
        return r;
}

int     IncRingBufferPointer(int a)
{
        a+= 1;
        if (a>= cRingBufCount) a= 0;
        return a;
}
2 голосов
/ 06 мая 2009

Простая реализация может состоять из:

  • Буфер, реализованный в виде массива размера n, любого типа, который вам нужен
  • Указатель чтения или индекс (в зависимости от того, что более эффективно для вашего процессора)
  • Указатель записи или индекс
  • Счетчик, указывающий, сколько данных находится в буфере (выводится из указателей чтения и записи, но быстрее отслеживает их отдельно)

Каждый раз, когда вы пишете данные, вы перемещаете указатель записи и увеличиваете счетчик. Когда вы читаете данные, вы увеличиваете указатель чтения и уменьшаете счетчик. Если любой указатель достигает n, установите его на ноль.

Вы не можете писать, если counter = n. Вы не можете прочитать, если counter = 0.

...