Разработка многопоточного приложения, которое хорошо масштабируется - PullRequest
0 голосов
/ 06 января 2019

Приведенный ниже код демонстрирует то, что я пытаюсь сделать, и имеет ту же проблему, что и мой исходный код (который здесь не включен). У меня есть код спектрограммы, и я пытаюсь улучшить его производительность, используя несколько потоков (мой компьютер имеет 4 ядра). Код спектрограммы в основном вычисляет БПФ по множеству перекрывающихся кадров (эти кадры соответствуют выборкам звука в конкретный момент времени).

В качестве примера скажем, что у нас 1000 кадров, которые перекрываются на 50%. Если мы используем 4 потока, то каждый поток должен обрабатывать 250 кадров. Перекрывающиеся кадры просто означают, что если наши кадры имеют длину 1024 выборки, первый кадр имеет диапазон 0-1023, второй кадр 512-1535, третий 1024-2047 и т. д. (перекрытие 512 выборок).

Код создания и использования потоков

void __fastcall TForm1::Button1Click(TObject *Sender)
{
    numThreads = 4;
    fftLen = 1024;
    numWindows = 10000;
    int startTime = GetTickCount();

    numOverlappingWindows = numWindows*2;
    overlap = fftLen/2;
    const unsigned numElem = fftLen*numWindows+overlap;

    rx = new float[numElem];
    for(int i=0; i<numElem; i++) {
        rx[i] = rand();
    }
    useThreads = true;
    vWThread.reserve(numOverlappingWindows);

    if(useThreads){
    for(int i=0;i<numThreads;i++){
            TWorkerThread *pWorkerThread = new TWorkerThread(true); 
            pWorkerThread->SetWorkerMethodCallback(&CalculateWindowFFTs);//this is called in TWorkerThread::Execute
            vWThread.push_back(pWorkerThread);
        }
        pLock = new TCriticalSection();

        for(int i=0;i<numThreads;i++){ //start the threads
            vWThread.at(i)->Resume();
        }

        while(TWorkerThread::GetNumThreads()>0);
        }else CalculateWindowFFTs();

        int endTime = GetTickCount();

        Label1->Caption = IntToStr(endTime-startTime);
}
void TForm1::CalculateWindowFFTs(){

        unsigned startWnd = 0, endWnd = numOverlappingWindows, threadId;

        if(useThreads){
            threadId = TWorkerThread::GetCurrentThreadId();
            unsigned wndPerThread = numOverlappingWindows/numThreads;
            startWnd = (threadId-1)*wndPerThread;
            endWnd   =  threadId*wndPerThread;

        if(numThreads==threadId){
            endWnd = numOverlappingWindows;
            }
        }

    float *pReal, *pImg;

    for(unsigned i=startWnd; i<endWnd; i++){

            pReal = new float[fftLen];
            pImg  = new float[fftLen];

            memcpy(pReal, &rx[i*overlap], fftLen*sizeof(float));
            memset(pImg, '0', fftLen);
            FFT(pReal, pImg, fftLen);  //perform an in place FFT

            pLock->Acquire();
            vWndFFT.push_back(pReal);
            vWndFFT.push_back(pImg);
            pLock->Release();
    }
}

void TForm1::FFT(float *rx, float *ix, int fftSize)
{
    int i, j, k, m;
    float rxt, ixt;

    m = log(fftSize)/log(2);
    int fftSizeHalf = fftSize/2;
    j = k = fftSizeHalf;

        for (i = 1; i < (fftSize-1); i++){
            if (i < j) {

            rxt = rx[j];
            ixt = ix[j];
            rx[j] = rx[i];
            ix[j] = ix[i];
            rx[i] = rxt;
            ix[i] = ixt;
            }
            k = fftSizeHalf;

            while (k <= j){
                j = j - k;
                k = k/2;
                }
            j = j + k;

        }    //end for
    int le, le2, l, ip;
    float sr, si, ur, ui;
    for (k = 1; k <= m; k++) {
        le = pow(2, k);
        le2 = le/2;
        ur = 1;
        ui = 0;
        sr = cos(PI/le2);
        si = -sin(PI/le2);
        for (j = 1; j <= le2; j++) {
            l = j - 1;
            for (i = l; i < fftSize; i += le) {
                ip = i + le2;
                rxt = rx[ip] * ur - ix[ip] * ui;
                ixt = rx[ip] * ui + ix[ip] * ur;
                rx[ip] = rx[i] - rxt;
                ix[ip] = ix[i] - ixt;
                rx[i] = rx[i] + rxt;
                ix[i] = ix[i] + ixt;
            }    //end for
            rxt = ur;
            ur = rxt * sr - ui * si;
            ui = rxt * si + ui * sr;
        }
    }
}

Хотя этот процесс легко разделить на несколько потоков, производительность улучшается лишь незначительно по сравнению с однопоточной версией (<10%). Интересно, что если я увеличу количество потоков, скажем, до 100, я получу увеличение скорости примерно на 25%, что удивительно, потому что Я ожидал бы, что издержки переключения контекста потока будут фактором в этом случае. </p>

Сначала я подумал, что основной причиной низкой производительности является блокировка записи в векторный объект, поэтому я экспериментировал с массивом векторов ( вектор на поток), что исключает необходимость в блокировках, но производительность осталась практически неизменной.

pVfft = new vector<float*>[numThreads];//create an array of vectors

  //and then in CalculateWindowFFTs, do something like

    vector<float*> &vThr = pVfft[threadId-1];
    for(unsigned i=startWnd; i<endWnd; i++){

            pReal = new float[fftLen];
            pImg  = new float[fftLen];

            memcpy(pReal, &rx[i*overlap], fftLen*sizeof(float));
            memset(pImg, '0', fftLen);
            FFT(pReal, pImg, fftLen);  //perform an in place FFT

            vThr.push_back(pReal);      
    }

Я думаю, что у меня проблемы с кешированием, хотя я не уверен, как изменить дизайн, чтобы найти решение, которое хорошо масштабируется.

Я также могу предоставить код для TWorkerThread, если вы считаете это важным.

Любая помощь очень ценится.

Спасибо

UPDATE:

Как предложено 1201ProgramAlarm, я удалил цикл while и получил повышение скорости на 15-20% в моей системе. Теперь мой основной поток активно не ожидает завершения потоков, а вместо этого TWorkerThread выполняет код в главном потоке через TThread::Synchronize после завершения всех рабочих потоков (т. Е. Когда numThreads достигло 0).

Хотя сейчас это выглядит лучше, оно все еще далеко от оптимальности.

1 Ответ

0 голосов
/ 07 января 2019

Блокировки для записи в vWndFFT будут повреждены, равно как и повторные (текущие) вызовы new, назначенные pReal и pImg (они должны быть вне цикла for).

Но реальным убийцей производительности, вероятно, является ваш цикл, ожидающий завершения потоков: while(TWorkerThread::GetNumThreads()>0);. Это будет использовать один доступный поток очень недружественным способом.

Одним из быстрых решений (не рекомендуется) было бы добавить sleep(1) (или 2, 5 или 10), чтобы цикл не был непрерывным.

Лучшим решением было бы сделать основной поток одним из ваших потоков вычислений и иметь способ для этого потока (после завершения всей обработки) просто дождаться завершения другого потока без использования ядра, используя что-то вроде WaitForMultipleObjects, доступное в Windows.

Один простой способ опробовать ваш многопоточный код - просто запустить поточный, но использовать только один поток. Производительность должна быть примерно такой же, как и у многопоточной версии, а результаты должны совпадать.

...