Задача логики в C. Определить количество взаимодействий на pthreads - PullRequest
2 голосов
/ 19 мая 2019

Итак, у меня есть этот код, я хочу определить количество взаимодействий вручную, поэтому для каждого определяемого мной потока, скажем, 10 взаимодействий, чтобы каждый поток вычислял блок из 10. Если я сделаю это, поток не пойдет послепервые 10.

По сути, я хочу, чтобы каждый раз, когда поток завершал вычисление, 10 взаимодействий уходили и выполняли еще один блок из десяти, скажем, у взаимодействий 100 из 10 есть 10 блоков, которые я хочу, например, 4 потока, работающие, каждый вычисляетблок, когда закончится финиш, и возьмите другой блок

Кто-нибудь может помочь?

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NTHREADS      4
#define ARRAYSIZE   1000000
#define ITERATIONS   ARRAYSIZE / NTHREADS

double  sum=0.0, a[ARRAYSIZE];
pthread_mutex_t sum_mutex;


void *do_work(void *tid) 
{
  int i, start, *mytid, end;
  double mysum=0.0;

  /* Initialize my part of the global array and keep local sum */
  mytid = (int *) tid;
  start = (*mytid * ITERATIONS);
  end = start + ITERATIONS;
  printf ("Thread %d doing iterations %d to %d\n",*mytid,start,end-1); 
  for (i=start; i < end ; i++) {
    a[i] = i * 1.0;
    mysum = mysum + a[i];
    }

  /* Lock the mutex and update the global sum, then exit */
  pthread_mutex_lock (&sum_mutex);
  sum = sum + mysum;
  pthread_mutex_unlock (&sum_mutex);
  pthread_exit(NULL);
}


int main(int argc, char *argv[])
{
  int i, start, tids[NTHREADS];
  pthread_t threads[NTHREADS];
  pthread_attr_t attr;

  /* Pthreads setup: initialize mutex and explicitly create threads in a
     joinable state (for portability).  Pass each thread its loop offset */
  pthread_mutex_init(&sum_mutex, NULL);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (i=0; i<NTHREADS; i++) {
    tids[i] = i;
    pthread_create(&threads[i], &attr, do_work, (void *) &tids[i]);
    }

  /* Wait for all threads to complete then print global sum */ 
  for (i=0; i<NTHREADS; i++) {
    pthread_join(threads[i], NULL);
  }
  printf ("Done. Sum= %e \n", sum);

  sum=0.0;
  for (i=0;i<ARRAYSIZE;i++){ 
  a[i] = i*1.0;
  sum = sum + a[i]; }
  printf("Check Sum= %e\n",sum);

  /* Clean up and exit */
  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&sum_mutex);
  pthread_exit (NULL);
}

Я пробовал много раз, но не могу понять, как это сделать.Может быть, какое-то время внутри пустоты?есть идеи?

Ответы [ 2 ]

3 голосов
/ 19 мая 2019

Вы можете использовать любую библиотеку пула потоков, чтобы сделать это. Я изменил код и добавил дополнительную переменную index_to_start, которая решает, с чего начать подсчет.

Добавлены комментарии к коду, вы можете пройти его.

Для таких проблем я бы рекомендовал использовать библиотеку пула потоков, которая позаботится о большей части работы.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NTHREADS      4
#define ARRAYSIZE   1000000
#define ITERATIONS   ARRAYSIZE / NTHREADS
// every thread will process BLOCK_SIZE numbers from array
#define BLOCK_SIZE      1000

double sum = 0.0, a[ARRAYSIZE];
// a mutex for index_to_start
pthread_mutex_t sum_mutex, index_mutex;
// this index tells thread that from where to start
static int index_to_start = 0;

void *do_work(void *tid) 
{
        int i, start, *mytid, end;
        double mysum = 0.0;

        /* Initialize my part of the global array and keep local sum */
        mytid = (int *)tid;

        // thread will be working untill index_to_start is less that ARRAYSIZE
        while (1) {
                // since index_to_start is shared, lock it
                pthread_mutex_lock(&index_mutex);
                if (index_to_start >= ARRAYSIZE) {
                        pthread_mutex_unlock(&index_mutex);
                        break;
                }
                // this is from where it should start counting
                start = index_to_start;

                // to find end just add BLOCK_SIZE to index_to_start and if it is going beyond ARRAYSIZE
                // just assign it to ARRAYSIZE
                if ((start + BLOCK_SIZE) < ARRAYSIZE)
                        index_to_start = end = start + BLOCK_SIZE;
                else 
                        index_to_start = end = ARRAYSIZE;

                // we are done with index_to_star, unlock the mutex
                pthread_mutex_unlock(&index_mutex);
                mysum = 0;    
                printf ("Thread %d doing iterations %d to %d\n", *mytid, start, end-1); 
                for (i = start; i < end ; i++) {
                        a[i] = i * 1.0;
                        mysum = mysum + a[i];
                }

                /* Lock the mutex and update the global sum, then exit */
                pthread_mutex_lock (&sum_mutex);
                sum = sum + mysum;
                pthread_mutex_unlock (&sum_mutex);
        }
        pthread_exit(NULL);
        return NULL;
}


int main(int argc, char *argv[])
{
        int i, start, tids[NTHREADS];
        pthread_t threads[NTHREADS];
        pthread_attr_t attr;

        /* Pthreads setup: initialize mutex and explicitly create threads in a
         *      joinable state (for portability).  Pass each thread its loop offset */
        pthread_mutex_init(&sum_mutex, NULL);
        pthread_mutex_init(&index_mutex, NULL);
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
        for (i=0; i<NTHREADS; i++) {
                tids[i] = i;
                pthread_create(&threads[i], &attr, do_work, (void *) &tids[i]);
        }

        /* Wait for all threads to complete then print global sum */ 
        for (i=0; i<NTHREADS; i++) {
                pthread_join(threads[i], NULL);
        }
        printf ("Done. Sum = %e\n", sum);

        sum = 0.0;
        for (i = 0; i < ARRAYSIZE; i++){ 
                sum = sum + a[i];
        }
        printf("Check Sum = %e\n",sum);

        /* Clean up and exit */
        pthread_attr_destroy(&attr);
        pthread_mutex_destroy(&sum_mutex);
        pthread_mutex_destroy(&index_mutex);
        pthread_exit (NULL);
}
0 голосов
/ 27 мая 2019

Итак, вот проблема, решаемая для тех, кто интересуется.

#include <pthread.h>    //utilizado para as pthreads
#include <stdio.h>      //utilizado para os printf
#include <stdlib.h>     //utilizado para o malloc

int nt,n,nbloco;                                                                        // defenição das variáveis base pedidas
int resultado_esperado = 0, *matriz, *buffer,sinal_saida,soma_global, items_buffer=0;   //defenição das variaveis udadas para calculo e controlo do programa

pthread_mutex_t indice_mutex, criacao_thread_mutex,buffer_mutex,soma_final_mutex;       // defenição dos mutex para exclusividade, poderia ser usado um so mutex para garantir que nao exista confusoes de desbloqueio opteou-se por criacao de mutex exclusivos para as operacoes

//CASO USE O PROGRAMA COM A FUNCAO SCHED YIELD
//DESATIVAR AS CONDICOES PRODUZIR E CONSUMIR

pthread_cond_t cond/*, produzir, consumir*/;                                                // defenição de condições para sinalizar threads criadas e sinalizaao de buffer cheio (mais rapido do que usar o sched_yield as duas opcoes estao acessiveis)

static int numero_para_comeco = 0;                                                      //numero de controlo para saber que bloco a thread vai processar

typedef struct {

    //esta estrutara vai passar os valores para as threads 
    //valores que vão diferir de uma thread para 
    //a outra, dái se encontrarem dentro da estrutura.

  int inicio_inicial;
  int id_threads;

}estrutura_geral;

// esta primeira função representa a função das tarefas
// calculadoras está dividida em duas partes um primeira 
//que processa o bloco associado de inicio conforme a sequencia de numero de ordem
//(cada thread obriagtoriamento computa um bloco) 
// e uma segunda que permite qualquer thread agarrar um novo bloco
// esta funç]ao tem um apontador que direciona para a estrutura

void *calculadoras(void *es)                                        
{
        int i, inicio, fim, contador;                                           //defenicao de variaveis
        int calculo_parcial = 0;                                    //defenicao variavel soma parcial

        pthread_mutex_lock(&criacao_thread_mutex);                  //bloqueio do mutex para garantir a leitura da estrutura e o envio de sinal da thread criada

        estrutura_geral * const estrutura = es;                     //pointer é criado para a estrutura

        int const id_threads = estrutura->id_threads;               //leitura dos elemnetos da estrutura
        int const inicio_inicial = estrutura->inicio_inicial;

        contador=0;                                                 //incializaçao do conatdor

        if ((inicio_inicial+nbloco) < n){                           //verificaçao do tamanho do bloco a calcular
            fim = inicio_inicial + nbloco;
        }else{
            fim = n;
        }

        pthread_cond_signal(&cond);                                 //sinalizacao à main thread que a thread foi criada

        pthread_mutex_unlock(&criacao_thread_mutex);                //desbloqueio do mutex para a variavel de condicao na main thread ser usada

        sched_yield();                                              //nao é necessário estar aqui mas ao fazer garanto que outras threads tenham ainda mais oportunidade.

        //printf ("A thread %d está a calcular o bloco de %d a %d\n", id_threads, inicio_inicial+1, fim); 

        //primeira parte da computacao onde os valores para
        //soma passados orbriatoriamente por oredem de criacao da thread

                //pthread_mutex_lock (&buffer_mutex);                   //(quando usar as condiçoes ou fazer o calculo do bloco completo)estamos a entrar numa regiao onde a computacao vai ser feira precisamos de boquea o mutex para que nao exista perda de dados

                 while(items_buffer==nt){                           

                 //enquanto o buffer for igual ao numero de tarefas
                 //existem duas opçoes ou a thread manda um sinal à thread que
                 //soma os valores do buffer para limpar o buffer
                 //ou entao podemos optar por nao mandar o sinal desbloquear o mutex
                 // e simplesmente libertar a tarefa com sched_yield
                 //o sinal acaba por se mais eficaz pois nao e certo que com o sched yield 
                 //a proxima tarefa seja a somadora

                            //pthread_cond_wait(&produzir,&buffer_mutex);
                            //pthread_mutex_unlock(&buffer_mutex);              //ativar quando o mutex for bloqueado anteriormente
                            sched_yield();                                      //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
                        }

                    for (i = inicio_inicial; i < fim ; i++) {


                        calculo_parcial+= matriz[i]*matriz[i];          //calculo da soma parcial
                        contador=contador+1;                            //conatra elemento somado

                }

                pthread_mutex_lock (&buffer_mutex);                     //regiao critica assegura-se que tem exlusividade para gravar os dados

                buffer[items_buffer+1]=calculo_parcial;                 //envio dos items para o buffer
                items_buffer=items_buffer+1;                            // contador de items no buffer

                //printf("o meu buffer tem %d items a soma parcial é de %d e o buffer tem %d\n",items_buffer,calculo_parcial,buffer[items_buffer]);
                //printf ("A thread %d calculou o bloco de %d a %d\n", id_threads, inicio_inicial+1, fim); 

                pthread_mutex_unlock(&buffer_mutex);                    //desbloqueio do mutex para libertar o processador às outras threads
                //pthread_cond_signal(&consumir);                           // sinalizar a thread somador que existem items no buffer mais uma vez poderiamos usar o sched yield, que nao seria tao eficaz
                sched_yield();                                          //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO

                //a partir deste momento caso exitam blocos
                //por computar as threads vao agarrar um novo bloco e computalo
                //segue exatamente a mesma estrutura indicada em cima
                //mas agora nao existe obrigatoriedade de cada thread ter um bloco

        while (1) {

                pthread_mutex_lock(&indice_mutex);

                if (numero_para_comeco >= n) {
                        pthread_mutex_unlock(&indice_mutex);
                        break;
                }

                inicio = numero_para_comeco;

                if ((inicio + nbloco) < n)
                        numero_para_comeco = fim = inicio + nbloco;
                else 
                        numero_para_comeco = fim = n;


                pthread_mutex_unlock(&indice_mutex);

                calculo_parcial = 0;                        // inicializaçao da soma parcial de volta a 0                       

                //printf ("A thread %d está a calcular o bloco de %d a %d\n", id_threads, inicio+1, fim); 

                    //pthread_mutex_lock (&buffer_mutex);

                    while(items_buffer==nt){

                            //pthread_cond_wait(&produzir,&buffer_mutex);
                            //pthread_mutex_unlock(&buffer_mutex);
                            sched_yield();                              //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
                        }

                    for (i = inicio; i < fim ; i++) {

                        calculo_parcial+= matriz[i]*matriz[i];
                        contador=contador+1;                            //conatra elemento somado

                }

                pthread_mutex_lock (&buffer_mutex);

                buffer[items_buffer+1]=calculo_parcial;
                items_buffer=items_buffer+1;

                //printf("o meu buffer tem %d items a soma parcial é de %d e o buffer tem %d\n",items_buffer,calculo_parcial,buffer[items_buffer]);
                //printf ("A thread %d calculou o bloco de %d a %d\n", id_threads, inicio+1, fim);

                pthread_mutex_unlock (&buffer_mutex);
                //pthread_cond_signal(&consumir);
                sched_yield();                      //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO
        }

        sinal_saida=sinal_saida+1;                      //forma de sinalizar que a thread saiu para que a thread de soma e que limpa o buffer saiba que pode acabar

        printf("tarefa %d calculou %d elementos\n",id_threads,contador);
        //printf("tarefa %d de saída\n",id_threads);

        pthread_exit(NULL);

}

//aqui é apresentada a funcao que soma as somas parcias que estao no buffer e o limpa

void *somadora(void *ts) 
{

    pthread_mutex_lock(&criacao_thread_mutex);          //bloqueamos o mutex para que seja dado o sinal de que a thread foi criada

    //printf("Sou a thread somadora\n");

    pthread_cond_signal(&cond);                         //sinalizamos a main thread que a thread foi criada

    pthread_mutex_unlock(&criacao_thread_mutex);        //desbloqueio do mutex para que as threads estejam á vontade

    pthread_mutex_lock(&buffer_mutex);                  //estramos numa operaçao critica onde os dados nao se podem perder, bloqueamos o mutex

        while(items_buffer==0){

            //emquanto o buffer tiver 0 elemnetos
            //sinalizamos as threads que podem produzir
            //é feita entao uma condicao de espera ou 
            //podemos usar um sched yield

            //pthread_cond_wait(&consumir,&buffer_mutex);
            pthread_mutex_unlock(&buffer_mutex);                //PARA ACTIVAR O SCHED YIELD DESATIVE A CONDIÇAO E ATIVE O MUTEX
            sched_yield();
        }

        while(sinal_saida<nt){                      //enquanto todas as thread nao se extinguirem esta condicao é valida

            while(items_buffer!=0){                 //sempre que o buffer é diferente de 0 é calculado a soma das somas parciais e o buffer é esvaziado

                soma_global+=buffer[items_buffer];  //actualizacao da soma global
                items_buffer=items_buffer-1;        //reduçao do buffer

                //printf("o meu buffer ficou com %d items\n",items_buffer);

            }

            pthread_mutex_unlock(&buffer_mutex);    //computacao realizada podemos desbloquear o mutex
            //pthread_cond_signal(&produzir);           //envio de sinal que as threads podem produzir realizar mais somas parciais
            sched_yield();                      //PODE ATIVAR O SCHED YIED PARA ISSO DESATIVE A CONDIÇAO
        }

        //quando todas as thread terminaram
        //a tarefa soma terá que rodar mais uma
        //para verificar se nao sobraram elementos no buffer_mutex
        //a logica é a mesma apresentada anteriormente

        pthread_mutex_lock(&soma_final_mutex);      

        while(items_buffer!=0){

                soma_global+=buffer[items_buffer];
                items_buffer=items_buffer-1;

                //printf("o meu buffer ficou com %d items\n",items_buffer);
            }

        pthread_mutex_unlock(&soma_final_mutex);

        //printf("Sou a thread somadora estou de saida\n");

        pthread_exit(NULL);

}

//funçao princial

int main(int argc, char *argv[])
{
        int i,z;                            //defeinao de variaveis

        //recolha de elementos da linha de comandos

        nt=atoi(argv[1]);
        n=atoi(argv[2]);
        nbloco=atoi(argv[3]);

        //verificacao dos elementos inceridos pelo utilizador

        if(argc!=4){
            printf("Utilização: ./mtss nt n nbloco\n");
            exit(1);}

        if(nt<1){
            printf("O numero de processos terá que ser pelo menos 1\n");
            exit(1);}

        if(n<1||n>999){
        printf("O n tem que estar comprefimido entre 1 e 999\n");
        exit(1);}

        if(nbloco<1){
        printf("O bloco tem que ser pelo menos 1\n");
        exit(1);
        }

        printf("Soma do quadrado dos %d primeiros numeros naturais com %d tarefas e blocos de %d termos\n",n,nt,nbloco);

        //defeniçao de threads e attributos

        pthread_t threads_calculadora[nt];
        pthread_t thread_soma;
        pthread_attr_t attr;

        //alocacar espaço para a estrutura que vai ser passada às threads

        estrutura_geral * estrutura = malloc(sizeof(estrutura_geral));

        //alocarçao de espaço para a matriz com os valores de calculo e para o buffer

        matriz = malloc(sizeof(int)*n);
        buffer = malloc(sizeof(int)*nt);

        // preenchimento da matriz com os valores de n

        for(z=0;z<n;z++){

            matriz[z]=z+1;

        }

        //inicializaçao dos mutex

        pthread_mutex_init(&indice_mutex, NULL);
        pthread_mutex_init(&criacao_thread_mutex,NULL);
        pthread_mutex_init(&soma_final_mutex,NULL);
        pthread_mutex_init(&buffer_mutex,NULL);

        //inicializaçao das condicoes

        pthread_cond_init(&cond,NULL);
        //pthread_cond_init(&produzir,NULL);                    //DESTIVAR EM CASO DE USO DO SCHED YIELD
        //pthread_cond_init(&consumir,NULL);                    //DESTIVAR EM CASO DE USO DO SCHED YIELD

        // inicializacao e defenicao de atributos

        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); //este atributo já é predefenido mas nunca é demais garantir

        numero_para_comeco=nbloco*nt;       //defenicao da variavel que controla o numero para a thread começar quando está no loop while(1)
        estrutura->inicio_inicial=0;        //inicializaçao da variavel

        //criaçao da thread soma que usara a funcao somadora e reduzira o buffer

        pthread_create(&thread_soma, &attr, somadora,estrutura);
        pthread_cond_wait(&cond,&criacao_thread_mutex);             //espera o sinal que a thread está criada

        //criaçao das threads calculadoras

        for (i=0; i<nt; i++) {

                ++estrutura->id_threads;            //numero de ordem da thread

                pthread_create(&threads_calculadora[i], &attr, calculadoras,estrutura);     //cria a thread

                estrutura->inicio_inicial=i*nbloco; //define o inicio da thread

                pthread_cond_wait(&cond,&criacao_thread_mutex); //espera que seja sinalizada que a thread foi criada
        }

        //espera que todas a threads terminem

        for (i=0; i<nt; i++) {

                pthread_join(threads_calculadora[i], NULL);

        }

        pthread_join(thread_soma, NULL);

        resultado_esperado = (n*(n+1)*((2*n)+1))/6;

        printf("Soma Total= %d\n",soma_global);
        printf("Resultado esperado = %d\n",resultado_esperado);

        //Libertar memória

        pthread_attr_destroy(&attr);
        pthread_mutex_destroy(&indice_mutex);
        pthread_mutex_destroy(&criacao_thread_mutex);
        pthread_mutex_destroy(&soma_final_mutex);
        pthread_mutex_destroy(&buffer_mutex);
        //pthread_cond_destroy(&produzir);                      //DESTIVAR EM CASO DE USO DO SCHED YIELD
        //pthread_cond_destroy(&consumir);                      //DESTIVAR EM CASO DE USO DO SCHED YIELD

        return 0;
}
...