У меня есть «серверная» программа, которая обновляет множество связанных списков в общей памяти в ответ на внешние события. Я хочу, чтобы клиентские программы замечали обновления в любом из списков как можно быстрее (минимальная задержка). Сервер помечает узел связанного списка state_
как FILLED
, как только его данные заполнены, а его следующий указатель установлен в правильное местоположение. До этого его state_
составляет NOT_FILLED_YET
. Я использую барьеры памяти, чтобы убедиться, что клиенты не видят state_
как FILLED
до того, как данные в действительности будут готовы (и, похоже, это работает, я никогда не вижу поврежденных данных). Кроме того, state_
является изменчивым, чтобы убедиться, что компилятор не поднимает проверку клиента из циклов.
Сохраняя код сервера точно таким же, я предложил 3 различных метода для клиента, чтобы сканировать связанные списки на предмет изменений. Вопрос в том, почему третий метод самый быстрый?
Метод 1: циклически перебирать все связанные списки (называемые «каналами») непрерывно, проверяя, изменились ли какие-либо узлы на «ЗАПОЛНЕНО»:
void method_one()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
while(true)
{
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
Data* current_item = channel_cursors[i];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
}
}
}
Метод 1 дал очень низкую задержку, когда число каналов было небольшим. Но когда количество каналов выросло (250K +), оно стало очень медленным из-за зацикливания на всех каналах. Итак, я попытался ...
Способ 2: присвойте каждому связанному списку идентификатор. Держите отдельный «список обновлений» в стороне. Каждый раз, когда один из связанных списков обновляется, вставляйте его идентификатор в список обновлений. Теперь нам просто нужно отслеживать один список обновлений и проверять идентификаторы, которые мы получаем из него.
void method_two()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
ACQUIRE_MEMORY_BARRIER;
if(update_cursor->state_ == NOT_FILLED_YET) {
continue;
}
::uint32_t update_id = update_cursor->list_id_;
Data* current_item = channel_cursors[update_id];
if(current_item->state_ == NOT_FILLED_YET) {
std::cerr << "This should never print." << std::endl; // it doesn't
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
}
Метод 2 дал ужасную задержку. В то время как метод 1 может дать задержку до 10 °, метод 2 необъяснимо часто дает задержку в 8 мс! Используя gettimeofday, кажется, что изменение в update_cursor-> state_ было очень медленным, чтобы распространяться от представления сервера к представлению клиента (я нахожусь на многоядерной коробке, поэтому я предполагаю, что задержка вызвана кешем). Поэтому я попробовал гибридный подход ...
Способ 3: сохранить список обновлений. Но постоянно проходите по всем каналам, и в каждой итерации проверяйте, обновлялся ли список обновлений. Если это так, идите с номером на нем. Если это не так, проверьте канал, к которому мы в настоящее время перешли.
void method_three()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
std::size_t idx = i;
ACQUIRE_MEMORY_BARRIER;
if(update_cursor->state_ != NOT_FILLED_YET) {
//std::cerr << "Found via update" << std::endl;
i--;
idx = update_cursor->list_id_;
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
Data* current_item = channel_cursors[idx];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
found_an_update = true;
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
}
}
}
Задержка этого метода была такой же, как и у метода 1, но масштабировалась на большое количество каналов. Проблема в том, что я понятия не имею, почему. Просто для того, чтобы разбираться с вещами: если я раскомментирую часть «найдено через обновление», она будет печататься между КАЖДЫМ СООБЩЕНИЕМ В ЖУРНАЛЕ LATENCY. Что означает, что вещи только когда-либо найдены в списке обновлений! Поэтому я не понимаю, как этот метод может быть быстрее, чем метод 2.
Полный скомпилируемый код (требуется GCC и boost-1.41), который генерирует случайные строки в качестве тестовых данных по адресу: http://pastebin.com/0kuzm3Uf
Обновление: все 3 метода эффективно блокируются, пока не произойдет обновление. Разница в том, сколько времени им требуется, чтобы заметить, что произошло обновление. Они все постоянно нагружают процессор, так что это не объясняет разницу в скорости. Я тестирую на 4-ядерном компьютере, на котором больше ничего не работает, поэтому серверу и клиенту не с чем конкурировать. Я даже сделал версию кода, где обновления сигнализируют о состоянии и заставляют клиентов ждать условия - это не помогло задержке любого из методов.
Обновление 2: Несмотря на то, что существует 3 метода, я пробовал только 1 за раз, поэтому только 1 сервер и 1 клиент соревнуются за элемент state_.