У меня есть следующие две программы: одна выступает в роли читателя, а другая - в качестве писателя. Автор, кажется, отправляет только около 3/4 данных для правильного чтения читателем. Есть ли способ гарантировать, что все данные отправляются? Я думаю, что я настроил его так, чтобы он надежно считывал и записывал, но все равно кажется, что он пропускает 1/4 данных.
Вот источник писателя
#define pipe "/tmp/testPipe"
using namespace std;
queue<string> sproutFeed;
ssize_t r_write(int fd, char *buf, size_t size) {
char *bufp;
size_t bytestowrite;
ssize_t byteswritten;
size_t totalbytes;
for (bufp = buf, bytestowrite = size, totalbytes = 0;
bytestowrite > 0;
bufp += byteswritten, bytestowrite -= byteswritten) {
byteswritten = write(fd, bufp, bytestowrite);
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if ((byteswritten) == -1 && (errno != EINTR))
return -1;
if (byteswritten == -1)
byteswritten = 0;
totalbytes += byteswritten;
}
return totalbytes;
}
void* sendData(void *thread_arg)
{
int fd, ret_val, count, numread;
string word;
char bufpipe[5];
ret_val = mkfifo(pipe, 0777); //make the sprout pipe
if (( ret_val == -1) && (errno != EEXIST))
{
perror("Error creating named pipe");
exit(1);
}
while(1)
{
if(!sproutFeed.empty())
{
string s;
s.clear();
s = sproutFeed.front();
int sizeOfData = s.length();
snprintf(bufpipe, 5, "%04d\0", sizeOfData);
char stringToSend[strlen(bufpipe) + sizeOfData +1];
bzero(stringToSend, sizeof(stringToSend));
strncpy(stringToSend,bufpipe, strlen(bufpipe));
strncat(stringToSend,s.c_str(),strlen(s.c_str()));
strncat(stringToSend, "\0", strlen("\0"));
int fullSize = strlen(stringToSend);
signal(SIGPIPE,SIG_IGN);
fd = open(pipe,O_WRONLY);
int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
cout << errno << endl;
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if(numWrite != fullSize )
{
signal(SIGPIPE,SIG_IGN);
bzero(bufpipe, strlen(bufpipe));
bzero(stringToSend, strlen(stringToSend));
close(fd);
}
else
{
signal(SIGPIPE,SIG_IGN);
sproutFeed.pop();
close(fd);
bzero(bufpipe, strlen(bufpipe));
bzero(stringToSend, strlen(stringToSend));
}
}
else
{
if(usleep(.0002) == -1)
{
perror("sleeping error\n");
}
}
}
}
int main(int argc, char *argv[])
{
signal(SIGPIPE,SIG_IGN);
int x;
for(x = 0; x < 100; x++)
{
sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
}
int rc, i , status;
pthread_t threads[1];
printf("Starting Threads...\n");
pthread_create(&threads[0], NULL, sendData, NULL);
rc = pthread_join(threads[0], (void **) &status);
}
Вот источник читателя
#define pipe "/tmp/testPipe"
char dataString[50000];
using namespace std;
char *getSproutItem();
void* readItem(void *thread_arg)
{
while(1)
{
x++;
char *s = getSproutItem();
if(s != NULL)
{
cout << "READ IN: " << s << endl;
}
}
}
ssize_t r_read(int fd, char *buf, size_t size) {
ssize_t retval;
while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
return retval;
}
char * getSproutItem()
{
cout << "Getting item" << endl;
char stringSize[4];
bzero(stringSize, sizeof(stringSize));
int fd = open(pipe,O_RDONLY);
cout << "Reading" << endl;
int numread = r_read(fd,stringSize, sizeof(stringSize));
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
cout << "Read Complete" << endl;
if(numread > 1)
{
stringSize[numread] = '\0';
int length = atoi(stringSize);
char recievedString[length];
bzero(recievedString, sizeof(recievedString));
int numread1 = r_read(fd, recievedString, sizeof(recievedString));
if(errno == EPIPE)
{
signal(SIGPIPE,SIG_IGN);
}
if(numread1 > 1)
{
recievedString[numread1] = '\0';
cout << "DATA RECIEVED: " << recievedString << endl;
bzero(dataString, sizeof(dataString));
strncpy(dataString, recievedString, strlen(recievedString));
strncat(dataString, "\0", strlen("\0"));
close(fd);
return dataString;
}
else
{
return NULL;
}
}
else
{
return NULL;
}
close(fd);
}
int main(int argc, char *argv[])
{
int rc, i , status;
pthread_t threads[1];
printf("Starting Threads...\n");
pthread_create(&threads[0], NULL, readItem, NULL);
rc = pthread_join(threads[0], (void **) &status);
}