Как я могу заменить мьютекс на правильную работу при использовании Vivado HLS? - PullRequest
0 голосов
/ 15 мая 2018

Извините заранее, потому что я новичок в Vivado HLS. В моем следующем коде я хочу синтезировать его, но Вивадо говорит мне, что вы не можете использовать мьютекс и все, что зависит, и выдает мне следующие ошибки.

ERROR: [SYNCHK 200-11] Global Variable 'readyQMutex' has an unsynthesizable struct type '%union.pthread_mutex_t.2.12.22 = type { %struct.__pthread_mu...' (a member pointer to struct itself).
ERROR: [SYNCHK 200-71] ../fpga_top.c:221: function 'pthread_mutex_lock' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:225: function 'pthread_cond_wait' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:237: function 'pthread_cond_signal' has no function body.
ERROR: [SYNCHK 200-71] ../fpga_top.c:238: function 'pthread_mutex_unlock' has no function body.
ERROR: [SYNCHK 200-11] ../fpga_top.c:18: Constant 'workerInfos' has an unsynthesizable type '[4 x %struct.threadInfo.6.16.26]*' (possible cause(s): structure variable cannot be decomposed due to (1) unsupported type conversion; (2) memory copy operation; (3) function pointer used in struct; (4) unsupported pointer comparison).
ERROR: [SYNCHK 200-61] ../fpga_top.c:75: unsupported memory access on variable 'child_task_ID' which is (or contains) an array with unknown size at compile time.
ERROR: [SYNCHK 200-71] ../fpga_top.c:77: function 'pthread_mutex_init' has no function body.
INFO: [SYNCHK 200-10] 8 error(s), 0 warning(s).

Я обнаружил, что должен написать соответствующий код, чтобы справиться с ним самостоятельно, если да, то как и что я должен написать?!

#include <stdbool.h>
#include "fpga_top.h"

int outputIndex = 0;
double core_speed[CORE_MAX] = {1.0, 1.0, 1.0, 1.0};
double outputTable[WORKLOAD_MAX*TASK_COUNT_MAX][EXCEL_Column_Size];

int readyQueueHead = 0;
int readyQueueRear = 0;
int readyQueueSize = 0;
char canContinue_ = 1;
int wlCounter = 0;      
bool flag = 1;

// Add Task to assignedQueue
void addToAssignedQueue(int task_ID, int workload_ID, int q)
{
    pthread_mutex_lock(&(workerInfos[q].workerMutex));
    while( workerInfos[q].assignedQSize>=DEEP)
    {
        pthread_cond_wait(&(workerInfos[q].workerWaitHandle_Add), &(workerInfos[q].workerMutex));
    }
    int i = workerInfos[q].assignedQRear;
    workerInfos[q].assignedQueue[i].task_ID = task_ID;
    workerInfos[q].assignedQueue[i].workload_ID = workload_ID;
    workerInfos[q].assignedQRear = (workerInfos[q].assignedQRear + 1) % DEEP;
    workerInfos[q].assignedQSize++;
    // A signal to a worker waiting to read from this queue
    pthread_cond_signal(&(workerInfos[q].workerWaitHandle));
    pthread_mutex_unlock(&(workerInfos[q].workerMutex));
}

// Read from assignedQueue
struct workItem readFromAssignedQueue(int q)
{
    struct threadInfo *workerInfo_ = &workerInfos[q];
    pthread_mutex_lock(&(workerInfo_->workerMutex));

    struct workItem tas_;
    // Initialize the output values (which may not be necessary now)
    tas_.task_ID = -1;
    tas_.workload_ID = -1;
    if(workerInfo_->assignedQSize <= 0)
    {
        struct timespec time_to_wait = {10, 0}; //10 sec wait
        pthread_cond_timedwait(&(workerInfo_->workerWaitHandle), &(workerInfo_->workerMutex), &time_to_wait);
    }
    if(workerInfo_->assignedQSize >0)
    {
        // Reading the assignedQueue if data is available
        tas_ = workerInfo_->assignedQueue[workerInfo_->assignedQHead];
        // Move forward the queue head index rotationally
        workerInfos[q].assignedQHead = (workerInfos[q].assignedQHead + 1) % DEEP;
        // Decreasing the count number of queue elements
        workerInfos[q].assignedQSize--;
        pthread_cond_signal(&(workerInfos[q].workerWaitHandle_Add));
    }
    pthread_mutex_unlock(&(workerInfo_->workerMutex));
    return tas_;
}

// Add Definition of Task to DAG
void addTask(int task_ID, int parentCount, int child_task_ID[], int childCount, int processingTime)
{
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    p_task_->parentCount = parentCount;
    p_task_->childCount = childCount;
    p_task_->processingTime = processingTime;
    // Initialize the parentReady variable for all workloads
    for (int i = 0; i < WORKLOAD_MAX;i++) {p_task_->parentReady[i] = 0;}
    // Copy the child's index
    for (int i = 0; i < childCount; i++) {p_task_->child_task_ID[i] = child_task_ID[i];}
    // Make parentReady mutex
    pthread_mutex_init(&(p_task_->parentReadyMutex), NULL);
}

// DAG Definition
void initDag()
{
    int ch0[] = { 1, 2, 3, 4}; addTask( 0, 0, ch0, 4, 10000);

    int ch1[] = { 5, 6, 7, 8}; addTask( 1, 1, ch1, 4, 20000);
    int ch2[] = { 5, 6, 7, 8}; addTask( 2, 1, ch2, 4, 20000);
    int ch3[] = { 5, 6, 7, 8}; addTask( 3, 1, ch3, 4, 20000);
    int ch4[] = { 5, 6, 7, 8}; addTask( 4, 1, ch4, 4, 20000);

    int ch5[] = { 9, 10}; addTask( 5, 4, ch5, 2, 30000);
    int ch6[] = { 9, 10}; addTask( 6, 4, ch6, 2, 30000);
    int ch7[] = { 9, 10}; addTask( 7, 4, ch7, 2, 30000);
    int ch8[] = { 9, 10}; addTask( 8, 4, ch8, 2, 30000);

    int ch9[] = { 11, 12}; addTask( 9, 4, ch9, 2, 40000);
    int ch10[] = { 11, 12}; addTask( 10, 4, ch10, 2, 40000);

    int ch11[] = {}; addTask( 11, 2, ch11, 0, 50000);
    int ch12[] = {}; addTask( 12, 2, ch12, 0, 50000);

    addToReadyQueue(0, 0);              // Root task, addToReadyQueue(int task_ID, int workload_ID)
    readFromReadyQueue();
    //allocateTask(0, 0, 0);                    // allocateTask(int task_ID, int workload_ID, int core_ID)
}

// Add Task to the end of the readyQueue 
void addToReadyQueue(int task_ID, int workload_ID)
{
    pthread_mutex_lock(&readyQMutex);
    while(readyQueueSize >= READY_LOOP_DEEP)
    {
        // Waiting for the queue to be empty if there is no space
        int res = pthread_cond_wait( &readyQWaitHandleAdd, &readyQMutex);
    }
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) added to readyQueue %d:%d.\n", task_ID, workload_ID,readyQueueRear, readyQueueSize);
    #endif
    readyQueue[readyQueueRear].task_ID = task_ID;
    readyQueue[readyQueueRear].workload_ID = workload_ID;
    // Move forward the queue rear index in rotation
    readyQueueRear = (readyQueueRear + 1) % READY_LOOP_DEEP;
    // Increasing the number of the queue elements
    readyQueueSize++;
    // The signal is given to workers waiting to read from the queue
    pthread_cond_signal(&readyQWaitHandleRead);
    pthread_mutex_unlock(&readyQMutex);
}

// Read from the beginning of the readyQueue
struct workItem readFromReadyQueue()
{
    struct workItem witem;
    witem.task_ID = -1;
    witem.workload_ID = -1;

    pthread_mutex_lock(&readyQMutex);
    // Waiting to queue if empty
    while(readyQueueSize <= 0)
    {
        pthread_cond_wait( &readyQWaitHandleRead, &readyQMutex);
    }
    // Picking up from queue head
    witem = readyQueue[readyQueueHead];
    // Move forward the queue head index in rotation
    readyQueueHead = (readyQueueHead + 1) % READY_LOOP_DEEP;
    // Reduce the number of queue elements
    readyQueueSize--;
    #ifdef PRINT_ReadyQ
        printf("Task #%d (workload #%d) removed to readyQueue. %d : %d\n", witem.task_ID , witem.workload_ID, readyQueueHead, readyQueueSize);
    #endif
    // The signal is given to workers who are waiting for the queue to be empty
    pthread_cond_signal(&readyQWaitHandleAdd);
    pthread_mutex_unlock(&readyQMutex);
    return witem;
}

// Check if the reaadyQueue is empty with the corresponding mutex
int isReadyQueueEmpty()
{
    int res = 0;
    pthread_mutex_lock(&readyQMutex);
    res = (readyQueueSize == 0);
    pthread_mutex_unlock(&readyQMutex);
    return res;
}

// Assigning Task to the Worker (Cores)
struct outputsFromFPGA allocateTask(int task_ID, int workload_ID, int core_ID)
{
    if (flag == 1)
    {
        initDag();
        flag = 0;
    }
    #ifdef PRINT_AllocateTask
        printf("Task #%d (workload #%d) assigned to Core #%d;\n", task_ID, workload_ID, core_ID);
    #endif
    addToAssignedQueue( task_ID, workload_ID, core_ID);

    struct outputsFromFPGA FPGAOutputs;
    FPGAOutputs.task_ID = task_ID;
    FPGAOutputs.workload_ID = workload_ID;
    FPGAOutputs.core_ID = core_ID;
}

// Ending each task and inform the children
void taskDone(int task_ID, int workload_ID, int core_ID)
{
    struct Task_Package_Profile task_ = taskArray[task_ID];
    #ifdef PRINT_TaskDone
        printf("taskDone: Task #%d (workload #%d);\n", task_ID, workload_ID);
    #endif
    // Increase the child's parentReady variable and send the children to the ready queue if all parents are finished
    struct Task_Package_Profile *p_task_ = &(taskArray[task_ID]);
    for(int i = 0; i < p_task_->childCount; i++)
    {
        struct Task_Package_Profile *p_childTsk = &(taskArray[p_task_->child_task_ID[i]]);
        int nbParentReady = 0;
        // Increase the parentReady variable
        pthread_mutex_lock(&(p_childTsk->parentReadyMutex));
        nbParentReady = ++(p_childTsk->parentReady[workload_ID]);
        pthread_mutex_unlock(&(p_childTsk->parentReadyMutex));
        // Send the child to the ready queue if all parents are finished
        if (nbParentReady == p_childTsk->parentCount)
            addToReadyQueue(p_task_->child_task_ID[i], workload_ID);
    }
    pthread_mutex_lock(&assignQSizeCheckMutex);
    // Find the most empty assignedQueue and assign ready tasks as much as possible
    while(!isReadyQueueEmpty())
    {   // Finds the best assignedQueue
        int minQueue = 0;
        int minSize =  workerInfos[0].assignedQSize;
        for (int i = 1; i < CORE_MAX; i++)
        {
            if(workerInfos[i].assignedQSize < minSize)
            {
                minSize = workerInfos[i].assignedQSize;
                minQueue = i;
            }
        }
        // The most empty queue should be smaller than Deep so that it can be added to the queue
        if(minSize < DEEP)
        {
            struct workItem witem = readFromReadyQueue();
            struct outputsFromFPGA FPGAOutputs = allocateTask(witem.task_ID, witem.workload_ID, minQueue);
        }
        else
        {
            break;  // All assignedQueue are full
        }
    }
    pthread_mutex_unlock(&assignQSizeCheckMutex);
}

// Check the end of the program that has all the tests done
void finishCheck()
{
    if (wlCounter != WORKLOAD_MAX) return;
    for(int i = 0; i < CORE_MAX; i++)
    {
        if (workerInfos[i].assignedQSize > 0) return;
        if (workerInfos[i].coreState > 0) return;
    }
    if (!isReadyQueueEmpty()) return;
    canContinue_ = 0;
    for(int i = 0; i < CORE_MAX; i++)
        pthread_cond_signal(&(workerInfos[i].workerWaitHandle));
}

1 Ответ

0 голосов
/ 17 мая 2018

Синхронизация потоков может быть выполнена в HLS, как показано в , например, в этом документе , но она пока не поддерживается в Vivado HLS.

Это, как говорится, не означает, что невозможно реализовать ваше приложение на оборудовании. Один из подходов - реализовать каждый поток как отдельное аппаратное ядро. Общие данные могут быть помещены в другое ядро, что обеспечивает синхронизацию доступа к данным так, как вам нужно. Ядра могут связываться с общим объектом через потоковые интерфейсы. Вы можете реализовать параметры функции как потоковые интерфейсы с hls::stream. После реализации каждого ядра в виде IP-модуля вы можете подключить их через FIFO, сгенерированные с помощью генератора FIFO в блочном исполнении Vivado.

Вы можете создать, например, поток управления от каждого обрабатывающего ядра к общему объекту, который позволяет ядрам отправлять запрос на доступ к общему объекту. В общем объекте вы используете неблокирующие чтения из потоков, чтобы увидеть, хочет ли какой-либо из них получить эксклюзивный доступ. Затем вы принимаете запросы на запись или чтение только из потока управления от ядра, которому был предоставлен эксклюзивный доступ. Данные, связанные с чтением и записью, могут передаваться через выделенные потоки данных между ядрами и разделяемым объектом. Когда ядро ​​завершает работу с использованием общего объекта, оно может отправить команду освобождения, и общий объект снова начинает искать запросы во всех потоках управления. Требуется немного труда, но это возможное решение ...

...