Эффективный кольцевой буфер в C ++, который будет передан в параметр функции массива в стиле C - PullRequest
1 голос
/ 14 июля 2020

Мне нужен совет по поводу моего подхода к следующей проблеме. У меня есть постоянный ввод данных, которые мне нужно добавить в буфер, и на каждой итерации мне нужно передавать буферизованные данные в функцию, которая принимает массив в стиле C через указатель.

I ' m беспокоясь об эффективности, поэтому я подумал, как я могу хранить и управлять данными в каком-то кольцевом буфере, а также получать их как последовательные необработанные данные для передачи в указанную функцию.

Мой текущий подход можно резюмировать в следующем примере:

#include <iostream>
#include <array>
#include <algorithm>

void foo(double* arr, int size)
{
  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;
}

int main()
{
  const int size = 20;
  std::array<double, size> buffer{};

  for (double data = 0.0; data < 50.0; data += 1.0)
  {
      std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
      buffer.back() = data;

      foo(buffer.data(), size);
  }
}

В реальном варианте использования буфер также должен быть дополнен до размера "const" данных в начале (здесь я использую кавычки, потому что размер может или не может быть известным во время компиляции, но как только он станет известен, он никогда не изменится).

Я храню данные в std::array (или в std::vector, если размер не будет известен во время компиляции ), поскольку данные в памяти располагаются последовательно. Когда мне нужно вставить новые данные, я использую forward std::move, чтобы сдвинуть все, а затем вручную заменяю последний элемент. Наконец, я просто передаю std::array::data() и его размер функции.

Хотя на первый взгляд это должно работать эффективно, причина подсказывает мне, что, поскольку данные сохраняются последовательно, весь буфер все равно будет скопирован с std::move, и каждая вставка будет O (n)

Реальный размер буфера, вероятно, будет только в сотнях, а данные поступают с максимальной частотой 100 Гц, но проблема в том, что мне нужен результат вызываемой функции как можно скорее поэтому я не хочу терять время на управление буфером (даже если мы говорим несколько или даже меньше миллисекунд). У меня много вопросов по этому поводу, но их краткий список следующий:

  • Мой подход слишком наивен?
  • Верны ли мои рассуждения о O (n)?
  • Есть ли другие подводные камни с этим подходом?
  • Есть ли у вас предложения по поводу другого подхода, который мне следует изучить?

Ответы [ 3 ]

1 голос
/ 14 июля 2020

Спасибо за ответ Вернер. Когда я запускаю это решение на Repl.it, я получаю:

it took an average of 21us and a max of 57382us

Для сравнения, моя первоначальная идея с тем же размером буфера дает следующий результат:

it took an average of 19us and a max of 54129us

Это означает, что мой первоначальный подход действительно был наивным :)

Тем временем, ожидая ответа, я придумал следующее решение:

#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>

void foo(double* arr, int size)
{
  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;
}

int main()
{
  const int buffer_size = 20;
  std::array<double, buffer_size*2> buffer{};
  int buffer_idx = buffer_size;

  for (double data = 0.0; data < 100.0; data += 1.0)
  {
    buffer.at(buffer_idx - buffer_size) = data;
    buffer.at(buffer_idx++) = data;

    foo(buffer.data() + buffer_idx - buffer_size, buffer_size);

    buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
  }
}

Поскольку размер буфера не проблема, я выделяю вдвое больше памяти и вставляю данные в двух местах, смещенных размером буфера. Когда дохожу до конца, я просто go обратно как машинка. Идея состоит в том, что я подделываю кольцевой буфер, сохраняя еще одну копию данных, чтобы он мог читать данные, как если бы они пересекали полный круг.

Для размера буфера 50000 это дает мне следующий результат, который я разыскивается:

it took an average of 0us and a max of 23us
0 голосов
/ 14 июля 2020

Помимо ответа от stribor14 у меня есть еще два предложения. Они основаны только на производительности, поэтому читабельный или обслуживаемый код здесь не будет. Когда все места будут записаны, вторая половина будет скопирована в первую. Мой первый инстинкт подсказывает, что это могло бы быть лучше. Мое рассуждение заключалось в том, что общее количество записей будет одинаковым, но все записи будут последовательными (вместо того, чтобы перескакивать каждую секунду записи в другое место в массиве).

#include <cstddef>
#include <cstring>
#include <array>

const size_t buffer_size = 50'000;

int main()
{
    std::array<double, 2 * buffer_size> buffer{};
    double *index = buffer.data();
    double *mid = index + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        if (index == mid)
        {
            index = buffer.data();
            std::memcpy(index, mid, buffer_size * sizeof(double));
        }

        *(index++ + buffer_size) = data;

        foo(index, buffer_size);
    }
}

В качестве альтернативы я думал, что это возможно чтобы оптимизировать собственный ответ OP, чтобы удалить доступ к массиву. Идея состоит в том, что buffer[buffer_idx - buffer_size] требует 2 сложения для вычисления местоположения этого значения, а именно: *(buffer + buffer_idx - buffer_size). Если buffer_idx содержит указатель, требуется только одно добавление. Это дает следующий код:

#include <cstddef>
#include <array>

const size_t buffer_size = 50'000;

int main()
{
    std::array<double, buffer_size * 2> buffer{};
    double *index = buffer.data();
    double *mid = buffer.data() + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        *index = data;
        *(index + buffer_size) = data;
        ++index;

        index -= buffer_size * (index == mid);

        foo(index, buffer_size);
    }
}

Теперь я заметил, что иду по кроличьей норе оптимизации C ++. Так что мы не могли останавливаться на достигнутом. Чтобы выбрать, какую реализацию использовать, я хотел запустить тест. Вернер Пиркл дал хорошую отправную точку . Но запускать это в нашем оптимизированном коде бессмысленно, потому что измеренное время составляет 0 мкс. Так что давайте немного изменим его. Я написал al oop more внутри теста, чтобы дать ему время выполнения, и придумал:

const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;

// ... Set up of the buffers and indices

for (int i = 0; i < repeats; ++i)
{
    auto t1 = std::chrono::high_resolution_clock::now();

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        // ... add data to circular buffer

        ptr = // ... the start of the array
    }

    auto t2 = std::chrono::high_resolution_clock::now();
    duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
}

(обратите внимание на использование volatile double *, чтобы гарантировать, что необработанный указатель в непрерывный массив не оптимизирован.)

Во время выполнения этих тестов я заметил, что они очень зависят от флагов компилятора (-O2 -O3 -march = native ...). Я приведу некоторые результаты, но, как и все тесты C ++, относитесь к ним с недоверием и выполняйте свои собственные с реальной рабочей нагрузкой. (Время указано в среднем нс на вставку)

                     with `memcpy`   stribor14   `operator[]`   with pointers 
                   |---------------|-----------|--------------|---------------|
               -O2 |         1.38  |     1.57  |        1.41  |         1.15  |
               -O3 |         1.37  |     1.63  |        1.36  |         1.09  |
 -O3 -march=native |         1.35  |     1.61  |        1.34  |         1.09  |

Излишне говорить: я был весьма разочарован тем, что, по моему мнению, должно работать лучше всего. Но, как указывалось ранее, этот тест никоим образом не отражает реальную производительность.

0 голосов
/ 14 июля 2020

Вам всегда придется копировать ваши данные, так как «непрерывного» кольцевого буфера не существует (возможно, в каком-то модном кремнии он есть).

Также вы не можете инициализировать шаблон массива среды выполнения определенный размер.

Для этого можно использовать вектор:

#include <iostream>
#include <chrono>
#include <deque>
#include <vector>

int main() {

    std::vector<double> v;

    // pre fill it a little
    for(double data = 0.0; data > -50000.0; data -= 1.0) {
        v.push_back(data);
    }

    size_t cnt = 0;
    int duration = 0;
    int max = 0;

    for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) {

        auto t1 = std::chrono::high_resolution_clock::now();

        v.push_back(data);
        v.erase(v.begin());

        // foo(v.data(), v.size());

        auto t2 = std::chrono::high_resolution_clock::now();
        auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
        duration += delta;

        if(max == 0 || max < delta) {
            max = delta;
        }

    }

    std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;

    return 0;
}

Вывод:

it took an average of 11us and a max of 245 us
...