Стандартный термин для буфера переупорядочения потока ввода / вывода? - PullRequest
4 голосов
/ 29 мая 2010

У меня есть случай, когда все потоки одновременно генерируют данные, которые в конечном итоге записываются в один длинный последовательный поток файл . Мне нужно как-то сериализовать эти записи, чтобы поток записывался в правильном порядке.

то есть , у меня есть очередь ввода 2048 заданий j 0 .. j n , каждое из которых создает кусок данных o я . Задания выполняются параллельно, скажем, на восьми потоках, но выходные блоки должны появляться в потоке в том же порядке, что и соответствующие входные блоки & mdash; выходной файл должен быть в следующем порядке: o 0 o 1 o 2 ...

Решение этого вполне очевидно: мне нужен какой-то буфер, который накапливает и записывает выходные блоки в правильном порядке, аналогично буферу переупорядочения ЦП в Алгоритм Томасуло , или как что TCP собирает неупорядоченные пакеты перед передачей их на уровень приложений.

Прежде чем приступить к написанию кода, я хотел бы сделать быстрый поиск литературы, чтобы увидеть, есть ли какие-либо документы, которые решают эту проблему особенно умным или эффективным способом, поскольку у меня есть серьезные ограничения в реальном времени и памяти. Я не могу найти какие-либо документы, описывающие это, хотя; Поиск по всем перестановкам [потоков, одновременных, буфер переупорядочения, повторной сборки, ввода-вывода, сериализации] не дал ничего полезного. Мне кажется, что я просто не ищу правильные термины.

Существует ли общее академическое имя или ключевое слово для такого типа шаблона, по которому я могу искать?

Ответы [ 5 ]

1 голос
/ 16 июля 2010

Книга Enterprise Integration Patterns называет это Resequencer (p282 / web ).

0 голосов
/ 16 июля 2010

Выходная очередь содержит фьючерсы , а не фактические данные. Когда вы извлекаете элемент из входной очереди, немедленно отправьте соответствующее будущее в выходную очередь (следя за тем, чтобы это сохраняло порядок - см. Ниже). Когда рабочий поток обработал элемент, он может установить значение на будущее. Выходной поток может читать каждое будущее из очереди и блокировать, пока это будущее не будет готово. Если более поздние будут готовы раньше, это никак не повлияет на выходной поток, если фьючерсы в порядке.

Есть два способа убедиться, что фьючерсы в выходной очереди находятся в правильном порядке. Первый - использовать один мьютекс для чтения из входной очереди и записи в выходную очередь. Каждый поток блокирует мьютекс, берет элемент из входной очереди, публикует будущее в выходной очереди и освобождает мьютекс.

Второй - иметь один главный поток, который читает из входной очереди, публикует будущее в выходной очереди, а затем передает элемент в рабочий поток для выполнения.

В C ++ с одним мьютексом, защищающим очереди, это будет выглядеть так:

#include <thread>
#include <mutex>
#include <future>

struct work_data{};
struct result_data{};

std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;

result_data process(work_data const&); // do the actual work

void worker_thread()
{
    for(;;) // substitute an appropriate termination condition
    {
        std::promise<result_data> p;
        work_data data;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(input_queue.empty())
            {
                continue;
            }
            data=input_queue.front();
            input_queue.pop();
            std::promise<result_data> item_promise;
            output_queue.push(item_promise.get_future());
            p=std::move(item_promise);
        }
        p.set_value(process(data));
    }
}

void write(result_data const&); // write the result to the output stream

void output_thread()
{
    for(;;) // or whatever termination condition
    {
        std::future<result_data> f;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(output_queue.empty())
            {
                continue;
            }
            f=std::move(output_queue.front());
            output_queue.pop();
        }
        write(f.get());
    }
}
0 голосов
/ 30 мая 2010

Лично я бы не использовал перезаписываемый буфер. Я бы создал один объект «задание» на задание и, в зависимости от вашей среды, использовал бы передачу сообщений или мьютексы для получения завершенных данных от каждого задания по порядку. Если следующее задание не выполнено, процесс «писатель» ждет, пока он не будет выполнен.

0 голосов
/ 16 июня 2010

Я бы использовал кольцевой буфер, длина которого совпадает с количеством используемых вами потоков.Кольцевой буфер также будет иметь такое же количество мьютексов.

rinbuffer также должен знать id последнего фрагмента, который он записал в файл.Он эквивалентен индексу 0 вашего кольцевого буфера.

При добавлении в кольцевой буфер вы проверяете, можете ли вы записать, т. Е. Установлен индекс 0, после чего вы можете записывать более одного фрагмента за раз в файл.

Если индекс 0 не установлен, просто заблокируйте текущий поток для ожидания.- Вы также можете иметь кольцевой буфер в 2-3 раза длиннее, чем количество потоков, и блокировать его только при необходимости, т. Е. Когда было запущено достаточно заданий для заполнения буфера.

Не забудьте обновитьпоследний кусок записан жестко;)

Вы также можете использовать двойную буферизацию при записи в файл.

0 голосов
/ 29 мая 2010

На самом деле вам не нужно накапливать куски. Большинство операционных систем и языков предоставляют абстракцию файла с произвольным доступом, которая позволяет каждому потоку независимо записывать свои выходные данные в правильную позицию в файле, не влияя на выходные данные любого из других потоков.

Или вы пишете в действительно последовательный выходной файл, как сокет?

...