Я написал пример приложения, в котором родительский (производитель) порождает N дочерних (потребительских) потоков и использует глобальный массив для обмена данными между ними.Глобальный массив представляет собой циклическую память и защищен при записи данных в массив.
Заголовочный файл:
#define TRUE 1
#define FALSE 0
#define SUCCESS 0
#define FAILURE -1
/*
* Function Declaration.
*/
void Parent( void );
int CalcBitmap(unsigned int Num);
void InitSemaphore( void );
void DeInitSemaphore( void );
int ComputeComplexCalc( int op1, int op2);
/*
* Thread functions.
*/
void *Child( void *arg );
void *Report( void *arg1 );
/*
* Macro Definition.
*/
#define INPUT_FILE "./input.txt"
#define NUM_OF_CHILDREN 10
#define SIZE_CIRCULAR_BUF 256
#define BUF_SIZE 128
//Set the bits corresponding to all threads.
#define SET_ALL_BITMAP( a ) a = uiBitmap;
//Clears the bit corresponding to a thread after
//every read.
#define CLEAR_BIT( a, b ) a &= ~( 1 << b );
/*
* Have a dedicated semaphore for each buffer
* to synchronize the acess to the shared memory
* between the producer (parent) and the consumers.
*/
sem_t ReadCntrMutex;
sem_t semEmptyNode[SIZE_CIRCULAR_BUF];
/*
* Global Variables.
*/
unsigned int uiBitmap = 0;
//Counter to track the number of processed buffers.
unsigned int Stats;
unsigned int uiParentCnt = 0;
char inputfile[BUF_SIZE];
int EndOfFile = FALSE;
/*
* Data Structure Definition. ( Circular Buffer )
*/
struct Message
{
int id;
unsigned int BufNo1;
unsigned int BufNo2;
/* Counter to check if all threads read the buffer. */
unsigned int uiBufReadCntr;
};
struct Message ShdBuf[SIZE_CIRCULAR_BUF];
Исходный код:
/*
#
# ipc_communicator is a IPC binary where a parent
# create ten child threads, read the input parameters
# from a file, sends that parameters to all the children.
# Each child computes the operations on those input parameters
# and writes them on to their own file.
#
# Author: Ashok Vairavan Date: 8/8/2014
*/
/*
* Generic Header Files.
*/
#include "stdio.h"
#include "unistd.h"
#include "string.h"
#include "pthread.h"
#include "errno.h"
#include "malloc.h"
#include "fcntl.h"
#include "getopt.h"
#include "stdlib.h"
#include "math.h"
#include "semaphore.h"
/*
* Private Header Files.
*/
#include "ipc*.h"
/*
* Function to calculate the bitmap based on the
* number of threads. This bitmap is used to
* track which thread read the buffer and the
* pending read threads.
*/
int CalcBitmap(unsigned int Num)
{
unsigned int uiIndex;
for( uiIndex = 0; uiIndex < Num; uiIndex++ )
{
uiBitmap |= 1 << uiIndex;
}
return uiBitmap;
}
/*
* Function that performs complex computation on
* numbers.
*/
int ComputeComplexCalc( int op1, int op2)
{
return sqrt(op1) + log(op2);
}
/*
* Function to initialise the semaphores.
* semEmptyNode indicates if the buffer is available
* for write. semFilledNode indicates that the buffer
* is available for read.
*/
void InitSemaphore( void )
{
unsigned int uiIndex=0;
CalcBitmap(NUM_OF_CHILDREN);
sem_init( &ReadCntrMutex, 0, 1);
while( uiIndex<SIZE_CIRCULAR_BUF )
{
sem_init( &semEmptyNode[uiIndex], 0, 1 );
uiIndex++;
}
return;
}
/*
* Function to de-initilaise semaphore.
*/
void DeInitSemaphore( void )
{
unsigned int uiIndex=0;
sem_destroy( &ReadCntrMutex);
while( uiIndex<SIZE_CIRCULAR_BUF )
{
sem_destroy( &semEmptyNode[uiIndex] );
uiIndex++;
}
return;
}
/* Returns TRUE if the parent had reached end of file */
int isEOF( void )
{
return __sync_fetch_and_add(&EndOfFile, 0);
}
/*
* Thread functions that prints the number of buffers
* processed per second.
*/
void *Report( void *arg1 )
{
int CumStats = 0;
while( TRUE )
{
sleep( 1 );
CumStats += __sync_fetch_and_sub(&Stats, 0);
printf("Processed Per Second: %d Cumulative Stats: %d\n",
__sync_fetch_and_sub(&Stats, Stats), CumStats);
}
return NULL;
}
/*
* Function that reads the data from the input file
* fills the shared buffer and signals the children to
* read the data.
*/
void Parent( void )
{
unsigned int op1, op2;
unsigned int uiCnt = 0;
/* Read the input parameters and process it in
* while loop till the end of file.
*/
FILE *fp = fopen( inputfile, "r" );
while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
{
/* Wait for the buffer to become empty */
sem_wait(&semEmptyNode[uiCnt]);
/* Access the shared buffer */
ShdBuf[uiCnt].BufNo1 = op1;
ShdBuf[uiCnt].BufNo2 = op2;
/* Bitmap to track which thread read the buffer */
sem_wait( &ReadCntrMutex );
SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
sem_post( &ReadCntrMutex );
__sync_fetch_and_add( &uiParentCnt, 1);
uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
}
/* If it is end of file, indicate it to other
* children using global variable.
*/
if( feof( fp ) )
{
__sync_add_and_fetch(&EndOfFile, TRUE);
fclose( fp );
return;
}
return;
}
/*
* Child thread function which takes threadID as an
* argument, creates a file using threadID, fetches the
* parameters from the shared memory, computes the calculation,
* writes the output to their own file and will release the
* semaphore when all the threads read the buffer.
*/
void *Child( void *ThreadPtr )
{
unsigned int uiCnt = 0, uiTotalCnt = 0;
unsigned int op1, op2;
char filename[BUF_SIZE];
int ThreadID;
ThreadID = *((int *) ThreadPtr);
free( ThreadPtr );
snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );
FILE *fp = fopen( filename, "w" );
if( fp == NULL )
{
perror( "fopen" );
return NULL;
}
while (TRUE)
{
/* Wait till the parent fills the buffer */
if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
{
/* Access the shared memory */
op1 = ShdBuf[uiCnt].BufNo1;
op2 = ShdBuf[uiCnt].BufNo2;
fprintf(fp, "%d %d = %d\n", op1, op2,
ComputeComplexCalc( op1, op2) );
sem_wait( &ReadCntrMutex );
ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
{
__sync_add_and_fetch(&Stats, 1);
/* Release the semaphore lock if
all readers read the data */
sem_post(&semEmptyNode[uiCnt]);
}
sem_post( &ReadCntrMutex );
uiTotalCnt++;
uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;
/* If the parent reaches the end of file and
if the child thread read all the data then break out */
if( isEOF() && ( uiTotalCnt == uiParentCnt ) )
{
//printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
break;
}
}
else
{
/* Sleep for ten micro seconds before checking
the shared memory */
usleep(10);
}
}
fclose( fp );
return NULL;
}
void usage( void )
{
printf(" Usage:\n");
printf(" -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
printf(" -? - Help Menu. \n" );
exit(1);
}
int main( int argc, char *argv[])
{
pthread_attr_t ThrAttr;
pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
unsigned int uiThdIndex = 0;
unsigned int *pThreadID;
int c;
strncpy( inputfile, INPUT_FILE, BUF_SIZE );
while( ( c = getopt( argc, argv, "f:" )) != -1 )
{
switch( c )
{
case 'f':
strncpy( inputfile, optarg, BUF_SIZE );
break;
default:
usage();
}
}
if( access( inputfile, F_OK ) == -1 )
{
perror( "access" );
return FAILURE;
}
InitSemaphore();
pthread_attr_init( &ThrAttr );
while( uiThdIndex< NUM_OF_CHILDREN )
{
/* Allocate the memory from the heap and pass it as an argument
* to each thread to avoid race condition and invalid reference
* issues with the local variables.
*/
pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
if( pThreadID == NULL )
{
perror( "malloc" );
/* Cancel pthread operation */
return FAILURE;
}
*pThreadID = uiThdIndex;
if( pthread_create(
&ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
{
printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
perror( "pthread_create" );
return FAILURE;
}
uiThdIndex++;
}
/* Report thread reports the statistics of IPC communication
* between parent and childs threads.
*/
if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
{
perror( "pthread_create" );
return FAILURE;
}
Parent();
uiThdIndex = 0;
while( uiThdIndex < NUM_OF_CHILDREN )
{
pthread_join( ThrChild[uiThdIndex], NULL );
uiThdIndex++;
}
/*
* Cancel the report thread function when all the
* children exits.
*/
pthread_cancel( ThrReport );
DeInitSemaphore();
return SUCCESS;
}