Как гарантировать, что read () на самом деле отправляет 100% данных, отправленных методом write () через именованные каналы - PullRequest
0 голосов
/ 25 мая 2009

У меня есть следующие две программы: одна выступает в роли читателя, а другая - в качестве писателя. Автор, кажется, отправляет только около 3/4 данных для правильного чтения читателем. Есть ли способ гарантировать, что все данные отправляются? Я думаю, что я настроил его так, чтобы он надежно считывал и записывал, но все равно кажется, что он пропускает 1/4 данных.

Вот источник писателя

#define pipe "/tmp/testPipe"

using namespace std;

queue<string> sproutFeed;


ssize_t r_write(int fd, char *buf, size_t size) {
   char *bufp;
   size_t bytestowrite;
   ssize_t byteswritten;
   size_t totalbytes;

   for (bufp = buf, bytestowrite = size, totalbytes = 0;
        bytestowrite > 0;
        bufp += byteswritten, bytestowrite -= byteswritten) {
      byteswritten = write(fd, bufp, bytestowrite);
            if(errno == EPIPE)
            {
            signal(SIGPIPE,SIG_IGN);
            }
      if ((byteswritten) == -1 && (errno != EINTR))
         return -1;
      if (byteswritten == -1)
         byteswritten = 0;
      totalbytes += byteswritten;
   }
   return totalbytes;
}


void* sendData(void *thread_arg)
{

int fd, ret_val, count, numread;
string word;
char bufpipe[5];


ret_val = mkfifo(pipe, 0777); //make the sprout pipe

if (( ret_val == -1) && (errno != EEXIST)) 
{
    perror("Error creating named pipe");
    exit(1);
}   
while(1)
{
    if(!sproutFeed.empty())
    {
        string s;
        s.clear();
        s = sproutFeed.front();
        int sizeOfData = s.length();
        snprintf(bufpipe, 5, "%04d\0", sizeOfData); 
        char stringToSend[strlen(bufpipe) + sizeOfData +1];
        bzero(stringToSend, sizeof(stringToSend));                  
        strncpy(stringToSend,bufpipe, strlen(bufpipe));         
        strncat(stringToSend,s.c_str(),strlen(s.c_str()));
        strncat(stringToSend, "\0", strlen("\0"));                  
        int fullSize = strlen(stringToSend);            
        signal(SIGPIPE,SIG_IGN);

        fd = open(pipe,O_WRONLY);
        int numWrite = r_write(fd, stringToSend, strlen(stringToSend) );
        cout << errno << endl;
        if(errno == EPIPE)
        {
        signal(SIGPIPE,SIG_IGN);
        }

        if(numWrite != fullSize )
        {               
            signal(SIGPIPE,SIG_IGN);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
            close(fd);
        }
        else
        {
            signal(SIGPIPE,SIG_IGN);
            sproutFeed.pop();
            close(fd);
            bzero(bufpipe, strlen(bufpipe));
            bzero(stringToSend, strlen(stringToSend));
        }                   
    }
    else
    {
        if(usleep(.0002) == -1)
        {
            perror("sleeping error\n");
        }
    }
}

}

int main(int argc, char *argv[])
{
    signal(SIGPIPE,SIG_IGN);
    int x;
    for(x = 0; x < 100; x++)
    {
        sproutFeed.push("All ships in the sea sink except for that blue one over there, that one never sinks. Most likley because it\'s blue and thats the mightiest colour of ship. Interesting huh?");
    }
    int rc, i , status;
    pthread_t threads[1];       
    printf("Starting Threads...\n");
    pthread_create(&threads[0], NULL, sendData, NULL);
    rc = pthread_join(threads[0], (void **) &status);

}

Вот источник читателя

#define pipe "/tmp/testPipe"

char dataString[50000];
using namespace std;
char *getSproutItem();

void* readItem(void *thread_arg)
{
    while(1)
    {
        x++;
        char *s = getSproutItem();
        if(s != NULL)
        {
            cout << "READ IN: " << s << endl;
        }
    }
}


ssize_t r_read(int fd, char *buf, size_t size) {
   ssize_t retval;
   while (retval = read(fd, buf, size), retval == -1 && errno == EINTR) ;
   return retval;
}


char * getSproutItem()
{
    cout << "Getting item" << endl;
    char stringSize[4];
    bzero(stringSize, sizeof(stringSize));
    int fd = open(pipe,O_RDONLY);
    cout << "Reading" << endl;

    int numread = r_read(fd,stringSize, sizeof(stringSize));


    if(errno == EPIPE)
    {
        signal(SIGPIPE,SIG_IGN);

    }
    cout << "Read Complete" << endl;

    if(numread > 1)
    {

        stringSize[numread] = '\0'; 
        int length = atoi(stringSize);
        char recievedString[length];
        bzero(recievedString, sizeof(recievedString));
        int numread1 = r_read(fd, recievedString, sizeof(recievedString));
        if(errno == EPIPE)
        {


signal(SIGPIPE,SIG_IGN);
    }       
    if(numread1 > 1)
    {
        recievedString[numread1] = '\0';
        cout << "DATA RECIEVED: " << recievedString << endl;
        bzero(dataString, sizeof(dataString));
        strncpy(dataString, recievedString, strlen(recievedString));
        strncat(dataString, "\0", strlen("\0"));
        close(fd);  
        return dataString;
    }
    else
    {
        return NULL;
    }

}
else
{
    return NULL;
}

close(fd);

}

int main(int argc, char *argv[])
{
        int rc, i , status;
        pthread_t threads[1];       
        printf("Starting Threads...\n");
        pthread_create(&threads[0], NULL, readItem, NULL);
        rc = pthread_join(threads[0], (void **) &status); 

}

Ответы [ 4 ]

4 голосов
/ 25 мая 2009

Вы определенно используете сигналы неправильно. Потоки здесь совершенно не нужны - по крайней мере, в предоставленном коде. Строковые вычисления просто странные. Получите эту книгу и не прикасайтесь к клавиатуре, пока не закончите читать :)

0 голосов
/ 25 мая 2009

Общий метод, используемый для отправки данных по именованным каналам, заключается в добавлении заголовка с длиной полезной нагрузки. Затем вы читаете (fd, header_len); читать (rd, data_len); Обратите внимание, что последнее чтение () должно быть выполнено в цикле, пока data_len не будет прочитан или eof. Также обратите внимание, что если у вас есть несколько пишущих в именованный канал, тогда записи являются атомарными (если их размер достаточно разумный) несколько писателей не будут помещать частичные сообщения в буферы ядра.

0 голосов
/ 25 мая 2009

Возможно, вас укусила семантика обработки сигналов потока POSIX в главном потоке читателя. Стандарт POSIX позволяет потоку POSIX получать сигнал, не обязательно ожидаемый поток. Блокировать сигналы там, где не хотел. signal(SIG_PIPE,SIG_IGN) твой друг. Добавьте один к основному читателю.

Семантика обработки потоков POSIX, перевод POS в POSIX. (но это облегчает реализацию потоков POSIX.)

Изучить трубу в / tmp с помощью ls? это не пусто?

0 голосов
/ 25 мая 2009

Сложно сказать, что здесь происходит. Может быть, вы получаете ошибку, возвращенную одним из ваших системных вызовов? Вы уверены, что успешно отправляете все данные?

У вас также есть недопустимый код:

    int length = atoi(stringSize);
    char recievedString[length];

Это синтаксическая ошибка, так как вы не можете создать массив в стеке, используя неконстантное выражение для размера. Может быть, вы используете другой код в вашей реальной версии?

Вам нужно читать данные в цикле? Иногда функция возвращает часть доступных данных и требует, чтобы вы вызывали ее несколько раз, пока не исчезнут все данные.

Некоторые системные вызовы в Unix также могут возвращать EAGAIN, если системный вызов прерван - вы не обрабатываете этот случай по внешнему виду.

...