данные не синхронизируются при попытке реализовать модель производителя, потребителя, кольцевого буфера - PullRequest
0 голосов
/ 20 ноября 2018

Правильная ссылка на код: https://wandbox.org/permlink/JYr2XoaSxsS1QT14


Из-за этого я несколько дней застрял .. Я пытался создать модель «производитель-> кольцевой буфер-> потребитель».Сначала я использовал mutex, чтобы он работал, но он не асинхронный.Я хочу, чтобы потребитель продолжал читать без каких-либо остановок , точно так же, как видеоплеер, что, я думаю, я не могу сделать с помощью mutex.

Вот что я сделал:

  1. Я создаю структуру FIFO на основе динамического массива фиксированного размера.(кольцевой буфер)
  2. У меня есть пара указателей для операций push_right и pop_left.Так что проблем с передачей данных не будет, если я правильно понимаю.
  3. Я заставляю производителя написать несколько элементов впереди, тогда потребитель начинает читать, и ему необходимо убедиться, что:
    • потребитель читаетскорость <= скорость записи производителя (указатель чтения потребителя <указатель записи производителя) * </li>
    • Подходящий фиксированный размер массива, чтобы производитель не перекрывал элементы, которые потребитель не прочитал.

Моя проблема в том, что результат вывода не такой, как я ожидал, он не синхронизирован .. И я не знаю, как это отладить.

Вы можете увидеть данныепорядок записи (P ostream :) не совпадает с порядком чтения (O ostream:).

Один возможный вывод:

data size: 20
70 927 156 109 834 26 883 576 226 500 904 777 935 80 346 559 846 879 548 791 
********************
Consumer start working
791
548
879
846
26
346
109
156
927
70
500
226
576
883
26
834
109
156
927
70
Consumer done
********************
p_count: 20
c_count: 20
P ostream: 791 548 879 846 559 346 80 935 777 904 500 226 576 883 26 834 109 156 927 70 
C ostream: 791 548 879 846 26 346 109 156 927 70 500 226 576 883 26 834 109 156 927 70 

Код:

(он содержит main.cpp, circular_array и файл данных txt, поэтому я думаю, что посещение ссылки будет менее болезненным)

https://wandbox.org/permlink/ddQjNFdxABrjminQ


main.cpp

#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <fstream>
#include <sstream>
#include "circular_array.h"
using namespace ythlearn;
using namespace std;

int p_count = 0;
int c_count = 0;
int fileSize = 0;
ostringstream p_os, c_os;

void producer(CircularArray<int>* Ca, vector<int> &data){
    for(int i = 0; i < 5; i++){
        p_os << data.back() << " ";
        Ca->push_right(data.back());
        data.pop_back();
        p_count++;
    }
    while(!data.empty()){
        this_thread::sleep_for(chrono::seconds(1));    
        p_os << data.back() << " ";
        Ca->push_right(data.back());
        data.pop_back();
        p_count++;
    }

}

void consumer(CircularArray<int>* Ca){
    cout << "********************" << endl;
    cout << "Consumer start working" << endl;
    this_thread::sleep_for(chrono::seconds(5));
    while(c_count < fileSize){
        this_thread::sleep_for(chrono::seconds(1));    
        int re = Ca->pop_left();
        cout << re << endl;
        c_os << re << " ";
        c_count++;

    }
    cout << "Consumer done" << endl;
    cout << "********************" << endl;

}


void getInput(vector<int>& data){
    ifstream ifs("test.txt");
    int j;
    while(ifs >> j){
        data.push_back(j);
    }
}

int main(){
    cout << unitbuf;
    vector<int> data;
    getInput(data);

    CircularArray<int> Ca;

    ::fileSize = data.size();
    cout << "data size: " << ::fileSize << endl;
    for(const auto& s: data){
        cout << s << " ";
    }
    cout << endl;


    thread th_producer(producer, &Ca, std::ref(data));
    thread th_consumer(consumer, &Ca);

    th_consumer.join();
    th_producer.join();
    cout << "p_count: " << p_count << endl
         << "c_count: " << c_count << endl;
    cout << "P ostream: " << p_os.str() << endl;
    cout << "C ostream: " << c_os.str() << endl;
    return 0;
}

circular_array.h

#pragma once
#include <stdexcept>
#include <iostream>
namespace ythlearn{
    template<typename T>
    class CircularArray{
        public:
            CircularArray(int N = 10){
                head = tail = new T[N];
                past_end_ptr1 = past_end_ptr2 = head + N;
                start_ptr1 = start_ptr2 = head;
                _capacity = N;
                _size = 0;
            }

            void push_right(T elem){
                *tail = elem;
                if(tail + 1 == past_end_ptr1){
                    tail = start_ptr1;
                }else{
                    tail++;
                }
            }

            T pop_left(){
                T re = *head;
                if(head + 1 == past_end_ptr2){
                    head = start_ptr2;
                }else{
                    head++;
                }
                return re;
            }

            CircularArray& operator=(const CircularArray&) = delete;
            CircularArray(const CircularArray&) = delete;
            ~CircularArray(){
                delete[] start_ptr1;
            }

        private:
            T* head;
            T* tail;
            T* start_ptr1, *start_ptr2;
            T* past_end_ptr1, *past_end_ptr2;
            int _capacity;
            int _size;
    };
}

test.txt

70
927
156
109
834
26
883
576
226
500
904
777
935
80
346
559
846
879
548
791

1 Ответ

0 голосов
/ 26 ноября 2018

Короткая версия: Проблема заключается в том, что не реализована третья вещь, которую вы сказали, что сделали («указатель чтения потребителя <указатель записи производителя» и «производитель не будет переопределять элементы, которые есть у потребителя)читаю ").В частности, не проверять перезапись проблематично.Хороший план, не очень хорошее исполнение. </p>

Подробнее: Вы никогда не проверяете, заполнен ли круговой массив.Поскольку ваш тестовый сценарий находится на границе между массивом, который заполнен, и массивом переполненным, вы в конечном итоге столкнетесь с состоянием гонки.Иногда производитель перезаписывает начало массива до того, как его прочитает потребитель.

Вот временная шкала (измеряется в секундах):

0: Производитель записывает 5 значений вмассив.
1: Producer записывает в массив 6-е значение.
2: Producer записывает в массив 7-е значение.
3: Producer записывает в массив 8-е значение.
4: Producer записывает в массив 9-е значение.
5: Producer записывает 10-е значение вмассив (теперь он заполнен), и потребитель начинает свой цикл (но далеко не уходит, поскольку первый шаг в каждой итерации спит секунду).

5 + n: Producer записывает значение в массив, а Consumer читает значение из массива.Если источник идет первым, размер массива временно увеличивается до 11, что превышает его емкость.

Посмотрите на места, где Потребитель читает что-то отличное от того, что написал Продюсер.Сравните то, что прочитал Потребитель, с тем, что Продюсер написал 10 шагов спустя.


Теперь для общей критики, которую никто не спрашивал.

Есть два аспекта вашей реализации кругового массива, которые выглядят странно/неправильно.Во-первых, существует дублирование хранилища, в котором вы поддерживаете два идентичных указателя начала и два идентичных указателя конца-конца.Во-вторых, похоже, что _size всегда 0, что кажется бесполезным.

Один аспект, который не обязательно неправильный, но может быть неправдоподобным, - это то, как вы указываете N.Рассматривали ли вы сделать N параметром шаблона, похожим на то, что было сделано для std::array?Это может уменьшить объем используемой памяти (не нужно хранить _capacity) и устранить необходимость динамического управления памятью.

Приложение:
Мне пришло в голову, что у вас есть данныеэлементы, которые не должны быть изменены после создания, но которые не помечены const.Возможно, вы захотите решить эту проблему, тем более что пометка их const прояснит, что они не могут быть вовлечены в состояние гонки.Таким образом, вы могли бы объявить члены данных циклического массива более похожими на:

private:
    int const _capacity;
    T * const start;
    T * const past_end;
    T* head;
    T* tail;

Тогда конструктор мог бы быть больше похож на:

CircularArray(int N = 10) :
    _capacity(N),
    start(new T[N]),
    past_end(start + N)
{
    head = tail = start;
}

(На самом деле, вы можете использовать список инициализатора для всехчлены; я просто думал, что небольшие изменения будут более удобочитаемыми.) Еще одно преимущество стиля в том, что оба параметра new и delete будут применяться к start, что выглядит лучше для тех, кто проверяет код.

Еще одним упрощением может быть использование выражения start + N везде, где вы использовали past_end.Разница в производительности должна быть незначительной, и вы уменьшите объем используемой памяти.

В качестве альтернативы, мое раннее предложение сделать N параметром шаблона сделало бы вопрос const спорным.Используя N в качестве параметра шаблона (все еще со значением по умолчанию 10), члены данных циклического массива могут быть просто:

private:
    T start[N]; // As a template parameter, N is a compile-time constant
    T* head; // Or an index into the array
    T* tail; // Or an index into the array

Переключение на индексы также дает больше оснований отбрасывать элемент конца строки,Индекс конца-конца становится N, который является постоянной времени компиляции - нет необходимости тратить на него место для хранения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...