Многозадачность как заставить рабочий поток получить контроль после вызова функции бесконечного цикла - PullRequest
0 голосов
/ 06 июня 2010

предполагается создание 3 рабочих потоков с помощью pthread_create,

в этой подпрограмме рабочего потока каждый вызывает простую функцию бесконечного цикла, которая не имеет возврата для подсчета

как заставить рабочий поток получить управление после вызова функции бесконечного цикла и сохранить контекст функции бесконечного цикла для повторного вызова в рабочем потоке?

Ответы [ 3 ]

0 голосов
/ 06 июня 2010

хотят создать эффект пула потоков, после вызова функции бесконечного цикла каждый рабочий поток может изменить другие задачи (другую функцию бесконечного цикла) для запуска

например, 3 рабочих потока могут выполнять 4 задачи (функции с бесконечным циклом)

#ifndef JOB_CPP
#define JOB_CPP

#include "job.h"

#define NUM_OF_TASKS 4
#define NUM_OF_WORKERS 3
    void (* job_queue[NUM_OF_TASKS])(void*);
    void (* fp[NUM_OF_WORKERS])(void*); // original running job
    int running_task[NUM_OF_WORKERS];
    int idle[NUM_OF_TASKS];
    int last_running_task[NUM_OF_WORKERS];
    int no_of_tasks_running[NUM_OF_WORKERS];
    my_struct_t data = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0};

void func1(void *arg)
{
    int count = 0;
    int status;
    while(true)
    {
        //if((count % 100) == 0)
        //printf("func1 run %d\n", count);
        count = count + 1;
        //status = pthread_cond_signal(&data.cv);
    }
}
void func2(void *arg)
{
    int count = 0;
    int status;
    while(true)
    {
        //printf("func2 run %d\n", count);
        count = count + 1;
        //status = pthread_cond_signal(&data.cv);
    }
}
void func3(void *arg)
{   int count = 0;
    int status;
    while(true)
    {
        //printf("func3 run %d\n", count);
        count = count + 1;
        //status = pthread_cond_signal(&data.cv);
    }
}
void func4(void *arg)
{   int count = 0;
    int status;
    while(true)
    {
        //printf("func4 run %d\n", count);
        count = count + 1;
        //status = pthread_cond_signal(&data.done);
    }
}

void jobinit()
{
    for(int i=0; i<NUM_OF_TASKS; i++)
    {
        job_queue[i] = NULL;
        idle[i] = 0;
    }
    for(int i=0; i<NUM_OF_WORKERS; i++)
    {
        fp[i] = NULL;
        running_task[i] = 0;
        last_running_task[i] = 0;
        no_of_tasks_running[i] = 0;
    }
    jobadd(func1);
    jobadd(func2);
    jobadd(func3);
    jobadd(func4);
    jobrun();
}
void jobadd(void (*job)(void*))
{   
    for(int i=0; i<4; i++)
    {
        if(job_queue[i] == NULL)
        {
            job_queue[i] = job;
            return;
        }
    }
}
void* workserver(void *arg);
void* workserver(void *arg)
{
    int status, timedout;

    struct timespec timeout;

    status = pthread_mutex_lock(&data.mutex);
    while(true)
    {
        timedout = 0;   
        clock_gettime(CLOCK_REALTIME, &timeout);
        timeout.tv_sec += 2;

        sleep(1);
        //void (* clean)(void*);

        status = pthread_cond_timedwait(&data.cv, &data.mutex, &timeout);
        if(status == ETIMEDOUT){
            printf("worker wait timed out %d\n", (int)arg);
            timedout = 1;
        }else if(status != 0){
            printf("worker wait failed %d\n", (int)arg);
            status = pthread_mutex_unlock(&data.mutex);
            return NULL;
        }
        printf("workserver number: %d\n", (int)arg);

        status = pthread_mutex_unlock(&data.mutex);     

        printf("function run %d\n", (int)arg);
        (* job_queue[(int)arg])(NULL);

        printf("cond wait start %d\n", (int)arg);
        status = pthread_cond_wait(&data.done, &data.mutex);
        printf("cond wait end\n");

        status = pthread_mutex_lock(&data.mutex);
    }
}
void jobrun() 
{   
    for(int i=0; i<3; i++) {idle[i] = 0;}
    pthread_t r1_threadid[3];

    for(int i=0; i<3; i++)  
    {
        pthread_create(&r1_threadid[i], NULL, workserver, (void*)i);
    }

    int status;
    struct timespec timeout;

    timeout.tv_sec = time (NULL) + 2;
    timeout.tv_nsec = 0;

    while(true)
    {
    status = pthread_mutex_lock(&data.mutex);
    while(data.value == 0)
    {
        status = pthread_cond_timedwait(&data.cond, &data.mutex, &timeout);
    }
    if(data.value != 0)
    {
        //printf("condition was signaled\n");
        data.value = 0;
    }
    status = pthread_mutex_unlock(&data.mutex);
    if(status != 0)
        printf("unlock mutex error");
    }
}
#endif
0 голосов
/ 06 июня 2010

Я думаю, вы должны уточнить свой вопрос.

Если каждый рабочий поток вызывает бесконечный цикл, то я предполагаю, что ваш главный поток должен будет вызвать pthread_cancel () для каждого из них. Из того, что я понял, для этого может потребоваться вызов других функций pthread _ * () для установки «отменяемости» целевых потоков.

Конечно, это предложение напрашивается на вопрос. Чрезвычайно предпочтительным подходом было бы предотвращение этих бесконечных циклов. Напишите свой код так, чтобы он имел условия выхода ... чтобы работа была ограничена каким-либо вводом или имела какую-то обработку событий.

0 голосов
/ 06 июня 2010

Позвольте мне перефразировать, чтобы понять, понял ли я проблему.

У вас есть главный поток, который порождает 3 рабочих потока, каждый из которых выполняет длительную (бесконечную) работу.

В определенный момент вы хотите прервать обработку, сохраните состояние всех потоков, чтобы возобновить их с того места, где они остановились позднее.

Я думаю, что лучший способ сделать это - организовать работу потоков в блоках транзакций. При перезапуске вы проверяете последнюю завершенную транзакцию и идете оттуда.

Но так как я подозреваю, что это домашнее задание в низкоуровневом канале, могу я предложить общий логический параметр, который проверяется каждый раз, когда вы проходите цикл, чтобы выйти и сохранить состояние после этого. В качестве альтернативы «убить» поток, перехватить исключение и сохранить состояние. Последний вариант грязный.

...