В моем приложении у меня есть два потока: производитель (поток 1) и потребитель (поток 2). Каждый поток имеет интерфейс ввода и вывода (фактически указатель на список), который подключен к третьему потоку, который служит маршрутизатором.
Когда производитель пишет, он вызывает memcpy для копирования данных в буфер и помещает буфер в список. Тем временем поток маршрутизатора выполняет циклический поиск по всем потокам, которые к нему подключены, и отслеживает их интерфейсы, чтобы узнать, есть ли у какого-либо потока данные для отправки. Когда он видит, что список потока 1 не пуст, он проверяет, для какого потока предназначены данные. Данные вставляются во входной список целевого потока (в данном случае потока 2), после чего поток 2 выделяет c некоторую память, сохраняет данные в нее и возвращает указатель на эту новую область.
В своем тесте я измеряю пропускную способность, чтобы узнать, сколько времени уходит на отправку 100 тыс. Сообщений разного размера. Поток 1 отправляет данные определенного размера, поток 2 читает их и отправляет небольшое ответное сообщение, которое читает поток 1. Это будет один полный обмен. В первом тесте в потоке 1 я отправляю все 100 тысяч сообщений, а затем читаю 100 тысяч ответов. Во втором тесте в потоке 1 я попеременно отправляю сообщение и жду ответа и повторяю 100 тысяч раз. В обоих тестах поток 2 находится в al oop, читая сообщение и отправляя ответ. Я ожидал, что тест 1 будет иметь более высокую пропускную способность, потому что потоки должны тратить меньше времени на ожидание. Однако его пропускная способность заметно ниже, чем у теста 2. Я измерил, сколько времени занимают отдельные вызовы функций (для чтения / записи) в двух тестовых случаях, и они неизменно занимают больше времени в тесте 1 (на основе средних и медиан и отсутствия задержки. ), хотя числа имеют тот же порядок величины.
Когда я добавляю al oop ничего не делающего в поток 1, отправляющий l oop в тесте 1, я вижу значительно улучшенную пропускную способность для этого случая по сравнению с без задержки. Мое единственное предположение состоит в том, что добавление задержки замедляет работу производителя, чтобы потребитель мог поглощать данные, что не позволяет его входному списку становиться очень большим. Мне интересно, могут ли быть другие объяснения, и если да, то как я могу их проверить.
Edit
К сожалению, мой собственный код - это просто тест, который я описал выше, который вызывает библиотеку, которая фактически выполняет чтение / запись, создает этот третий поток et c. Сложно сделать из этого минимальный пример, потому что библиотека сложная и не моя. Я привожу псевдокод, чтобы проиллюстрировать настройку более подробно.
int NUM_ITERATIONS = 100000;
int msg_reply = 2; // size of the reply message in words
int msg_size = 512; // indicates 512 64 bit words
void generate(int iterations, int size, interface* out){
std::vector<long long> vec(size);
for(int i = 0; i < size; i++)
vec[i] = (long long) i;
for(int i = 0; i < iterations; i++)
out->lib_write((char*) vec.data(), size);
}
void receive(int iterations, int size, interface* in){
for(int i = 0; i < iterations; i++)
char* data = in->lib_read(size)
void producer(interface* in, interface* out){
// test 1
start = std::chrono::high_resolution_clock::now();
// write data of size msg_size, NUM_ITERATIONS times to out
generate(NUM_ITERATIONS, msg_size, out);
// read data of size msg_reply, NUM_ITERATIONS times from in
receive(NUM_ITERATIONS, msg_reply, in);
end = std::chrono::high_resolution_clock::now();
// using NUM_ITERATIONS, msg_size and time, compute and print throughput to stdout
print_throughput(end-start, "throughput_0", msg_size);
// test 2
start = std::chrono::high_resolution_clock::now();
for(int j = 0; j < NUM_ITERATIONS; j++){
generate(1, msg_size, out);
receive(1, msg_reply, in);
}
end = std::chrono::high_resolution_clock::now();
print_throughput(end-start, "throughput_1", msg_size);
}
void consumer(interface* in, interface* out){
for(int i = 0; i < 2; i++}{
for(int j = 0; j < NUM_ITERATIONS; j++){
receive(1, msg_size, in);
generate(1, msg_reply, out);
}
}
}
Вызовы lib_write()
и lib_read()
становятся довольно сложными. Чтобы уточнить приведенное выше описание, данные помещаются в буфер, а затем перемещаются в список. Интерфейс имеет член условной переменной, а запись вызывает его метод notify_one()
. Третий поток перебирает все имеющиеся у него указатели интерфейсов и проверяет, не являются ли их списки пустыми. Если это так, данные объединяются из одного выходного списка в целевой входной список с помощью метода splice()
в std :: list. Между тем, потребитель вызывает lib_read()
, который ожидает переменной условия, пока интерфейс пуст, а затем memcpy переносит данные в новую область и возвращает их.
// note: these will not compile as is. Undefined variables are class members
char * interface::lib_read(size_t * _size){
char * ret;
{
std::unique_lock<std::mutex> lock(mutex);
// packets is an std::list containing the incoming data
while (packets.empty()) {
cv.wait(lock);
}
curr_read_it = packets.begin();
}
size_t buff_size = curr_read_it->size;
ret = (char *)malloc(buff_size);
memcpy((char *)ret, (char *)curr_read_it->data, buff_size);
{
std::unique_lock<std::mutex> lock(mutex);
packets.erase(curr_read_it);
curr_read_it = packets.end();
}
return ret;
}
void interface::lib_write(char * data, int size){
// indicates the destination thread id
long long header = 1;
// buffer is a just an array that's max packet sized
memcpy((char *)buffer.data, &header, sizeof(long long));
memcpy((char *)buffer.data + sizeof(long long), (char *)data, size * sizeof(long long));
std::lock_guard<std::mutex> guard(mutex);
packets.push_back(std::move(buffer));
cv.notify_one();
}
// this is on thread 3
void route(){
do{
// this is a vector containing all the "out" interfaces
for(int i = 0; i < out_ptrs.size(); i++){
interface <long long> * _out = out_ptrs[i];
if(!_out->empty()){
// this just returns the header id (also locks the mutex)
long long dest= _out->get_dest();
// looks up the correct interface based on the id and splices
// a packet into from _out to the appropriate one. Locks mutex
in_ptrs[dest_map[dest]]->splice(_out);
}
}
}while(!done());