Почему добавление задержки улучшило бы пропускную способность данных в этой многопоточной среде? - PullRequest
0 голосов
/ 26 мая 2020

В моем приложении у меня есть два потока: производитель (поток 1) и потребитель (поток 2). Каждый поток имеет интерфейс ввода и вывода (фактически указатель на список), который подключен к третьему потоку, который служит маршрутизатором.

Когда производитель пишет, он вызывает memcpy для копирования данных в буфер и помещает буфер в список. Тем временем поток маршрутизатора выполняет циклический поиск по всем потокам, которые к нему подключены, и отслеживает их интерфейсы, чтобы узнать, есть ли у какого-либо потока данные для отправки. Когда он видит, что список потока 1 не пуст, он проверяет, для какого потока предназначены данные. Данные вставляются во входной список целевого потока (в данном случае потока 2), после чего поток 2 выделяет c некоторую память, сохраняет данные в нее и возвращает указатель на эту новую область.

В своем тесте я измеряю пропускную способность, чтобы узнать, сколько времени уходит на отправку 100 тыс. Сообщений разного размера. Поток 1 отправляет данные определенного размера, поток 2 читает их и отправляет небольшое ответное сообщение, которое читает поток 1. Это будет один полный обмен. В первом тесте в потоке 1 я отправляю все 100 тысяч сообщений, а затем читаю 100 тысяч ответов. Во втором тесте в потоке 1 я попеременно отправляю сообщение и жду ответа и повторяю 100 тысяч раз. В обоих тестах поток 2 находится в al oop, читая сообщение и отправляя ответ. Я ожидал, что тест 1 будет иметь более высокую пропускную способность, потому что потоки должны тратить меньше времени на ожидание. Однако его пропускная способность заметно ниже, чем у теста 2. Я измерил, сколько времени занимают отдельные вызовы функций (для чтения / записи) в двух тестовых случаях, и они неизменно занимают больше времени в тесте 1 (на основе средних и медиан и отсутствия задержки. ), хотя числа имеют тот же порядок величины.

Когда я добавляю al oop ничего не делающего в поток 1, отправляющий l oop в тесте 1, я вижу значительно улучшенную пропускную способность для этого случая по сравнению с без задержки. Мое единственное предположение состоит в том, что добавление задержки замедляет работу производителя, чтобы потребитель мог поглощать данные, что не позволяет его входному списку становиться очень большим. Мне интересно, могут ли быть другие объяснения, и если да, то как я могу их проверить.

Edit

К сожалению, мой собственный код - это просто тест, который я описал выше, который вызывает библиотеку, которая фактически выполняет чтение / запись, создает этот третий поток et c. Сложно сделать из этого минимальный пример, потому что библиотека сложная и не моя. Я привожу псевдокод, чтобы проиллюстрировать настройку более подробно.

int NUM_ITERATIONS = 100000;
int msg_reply = 2; // size of the reply message in words
int msg_size = 512; // indicates 512 64 bit words

void generate(int iterations, int size, interface* out){
    std::vector<long long> vec(size);
    for(int i = 0; i < size; i++)
        vec[i] = (long long) i;

    for(int i = 0; i < iterations; i++)
        out->lib_write((char*) vec.data(), size);
}

void receive(int iterations, int size, interface* in){
    for(int i = 0; i < iterations; i++)
        char* data = in->lib_read(size)

void producer(interface* in, interface* out){
    // test 1
    start = std::chrono::high_resolution_clock::now();
    // write data of size msg_size, NUM_ITERATIONS times to out
    generate(NUM_ITERATIONS, msg_size, out);
    // read data of size msg_reply, NUM_ITERATIONS times from in
    receive(NUM_ITERATIONS, msg_reply, in);
    end = std::chrono::high_resolution_clock::now();
    // using NUM_ITERATIONS, msg_size and time, compute and print throughput to stdout
    print_throughput(end-start, "throughput_0", msg_size);

    // test 2
    start = std::chrono::high_resolution_clock::now();
    for(int j = 0; j < NUM_ITERATIONS; j++){
        generate(1, msg_size, out);
        receive(1, msg_reply, in);
    }
    end = std::chrono::high_resolution_clock::now();
    print_throughput(end-start, "throughput_1", msg_size);
}

void consumer(interface* in, interface* out){
    for(int i = 0; i < 2; i++}{
        for(int j = 0; j < NUM_ITERATIONS; j++){
            receive(1, msg_size, in);
            generate(1, msg_reply, out);
        }
    }
}

Вызовы lib_write() и lib_read() становятся довольно сложными. Чтобы уточнить приведенное выше описание, данные помещаются в буфер, а затем перемещаются в список. Интерфейс имеет член условной переменной, а запись вызывает его метод notify_one(). Третий поток перебирает все имеющиеся у него указатели интерфейсов и проверяет, не являются ли их списки пустыми. Если это так, данные объединяются из одного выходного списка в целевой входной список с помощью метода splice() в std :: list. Между тем, потребитель вызывает lib_read(), который ожидает переменной условия, пока интерфейс пуст, а затем memcpy переносит данные в новую область и возвращает их.

// note: these will not compile as is. Undefined variables are class members

char * interface::lib_read(size_t * _size){

    char * ret;
    {
        std::unique_lock<std::mutex> lock(mutex);
        // packets is an std::list containing the incoming data
        while (packets.empty()) {
            cv.wait(lock);
        }
        curr_read_it = packets.begin();
    }

    size_t buff_size = curr_read_it->size;

    ret = (char *)malloc(buff_size);
    memcpy((char *)ret, (char *)curr_read_it->data, buff_size);
    {
        std::unique_lock<std::mutex> lock(mutex);
        packets.erase(curr_read_it);
        curr_read_it = packets.end();
    }
    return ret;
}

void interface::lib_write(char * data, int size){
    // indicates the destination thread id
    long long header = 1;

    // buffer is a just an array that's max packet sized
    memcpy((char *)buffer.data, &header, sizeof(long long));
    memcpy((char *)buffer.data + sizeof(long long), (char *)data, size * sizeof(long long));

    std::lock_guard<std::mutex> guard(mutex);
    packets.push_back(std::move(buffer));
    cv.notify_one();
}

// this is on thread 3
void route(){
    do{
        // this is a vector containing all the "out" interfaces
        for(int i = 0; i < out_ptrs.size(); i++){
            interface <long long> * _out = out_ptrs[i];
            if(!_out->empty()){
                // this just returns the header id (also locks the mutex)
                long long dest= _out->get_dest();
                // looks up the correct interface based on the id and splices
                // a packet into from _out to the appropriate one. Locks mutex
                in_ptrs[dest_map[dest]]->splice(_out);   
            }
        }

    }while(!done());

1 Ответ

0 голосов
/ 28 мая 2020

Я искал общие советы о том, какие факторы могут влиять на производительность многопоточности и что нужно тестировать, чтобы лучше понять, что происходит.

Я разговаривал с некоторыми другими людьми, и совет, который я получил, был Полезно было определить, было ли проблемой расписание ОС (это то, что я подозревал, но не знал, как проверить). По сути, я использовал taskset и sched_affinity(), чтобы заставить приложение работать на одном ядре или на подмножестве ядер, и смотрел, как они сравниваются друг с другом и с неограниченным случаем.

На основе ограничений, я получил совершенно разные результаты и смог увидеть некоторые тенденции, поэтому я вполне уверен, что могу сказать, что это проблема планирования ОС. Различные из них могут дать лучшую производительность при разных рабочих нагрузках.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...