Создание обработчика очереди / потока отправки в C ++ с каналами: переполнение FIFO - PullRequest
0 голосов
/ 31 января 2019

Потоки ресурсоемки для создания и использования, поэтому часто пул потоков используется повторно для асинхронных задач.Задача упаковывается, а затем «публикуется» в брокере, который поставит задачу в очередь в следующем доступном потоке.

Это идея, стоящая за очередями отправки (т. Е. Apple Central Central Dispatch) и обработчиками потоков (Android-механизм Looper).

Прямо сейчас я пытаюсь свернуть свой собственный.На самом деле, я закрываю пробел в Android, в котором есть API для публикации задач в Java, но не в нативном NDK.Тем не менее, я держу этот вопрос независимо от того, где могу.

Трубы - идеальный выбор для моего сценария.Я могу легко опрашивать файловый дескриптор конца чтения pipe (2) в моем рабочем потоке и ставить задачи из любого другого потока путем записи в конец записи.Вот как это выглядит:

int taskRead, taskWrite;

void setup() {
    // Create the pipe
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Set up a routine that is called when task_r reports new data
    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task - this is unsafe! See below.
        (*taskPtr)();

        // Clean up
        delete taskPtr;
    });
}

void post(const std::function<void(void)>& task) {
    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    // Write the pointer to the pipe - this may block if the FIFO is full!
    ::write(taskWrite, &taskPtr, sizeof(taskPtr));
}

Этот код помещает std::function в кучу и передает указатель на канал.Затем function_that_polls_file_descriptor вызывает предоставленное выражение, чтобы прочитать канал и выполнить функцию.Обратите внимание, что в этом примере нет проверок безопасности.

Это прекрасно работает в 99% случаев, но есть один существенный недостаток.Трубы имеют ограниченный размер, и если труба заполнена, вызовы post() будут зависать.Само по себе это небезопасно, пока не будет выполнен вызов post() в рамках задачи.

auto evil = []() {
    // Post a new task back onto the queue
    post({});
    // Not enough new tasks, let's make more!
    for (int i = 0; i < 3; i++) {
        post({});
    }

    // Now for each time this task is posted, 4 more tasks will be added to the queue.
});

post(evil);
post(evil);
...

Если это произойдет, то рабочий поток будет заблокирован, ожидая записик трубе.Но FIFO канала заполнен, и рабочий поток ничего не читает из него, поэтому вся система находится в тупике.

Что можно сделать, чтобы гарантировать, что вызовы post() исходят из рабочего потока всегда успешно, что позволяет работнику продолжить обработку очереди в случае ее заполнения?

Ответы [ 3 ]

0 голосов
/ 31 января 2019

Сделайте дескриптор файла записи канала неблокирующим, так что write завершится неудачно с EAGAIN, когда канал заполнен.


Одним из улучшений является увеличение размера буфера канала.

Другой способ - использовать сокет / сокет UNIX и увеличить размер буфера сокета.

Еще одно решение - использовать сокет дейтаграммы UNIX, из которого могут читать многие рабочие потоки, но следующий получает только один.дейтаграммы.Другими словами, вы можете использовать сокет дейтаграммы в качестве диспетчера потоков.

0 голосов
/ 04 февраля 2019

Благодаря всем комментариям и другим ответам в этом посте у меня теперь есть рабочее решение этой проблемы.

Уловка, которую я использовал, состоит в том, чтобы расставить приоритеты рабочих потоков, проверяя, какой поток вызывает post().Вот грубый алгоритм:

pipe ← NON-BLOCKING-PIPE()
overflow ← Ø
POST(task)
    success ← WRITE(task, pipe)
    IF NOT success THEN
        IF THREAD-IS-WORKER() THEN
            overflow ← overflow ∪ {task}
        ELSE
            WAIT(pipe)
            POST(task)

Затем в рабочем потоке:

LOOP FOREVER
    task ← READ(pipe)
    RUN(task)

    FOR EACH overtask ∈ overflow
        RUN(overtask)

    overflow ← Ø

Ожидание выполняется с помощью pselect (2) , адаптированным из ответаby @ Sigismondo.

Вот алгоритм, реализованный в моем исходном примере кода, который будет работать для одного рабочего потока (хотя я не проверял его после копирования-вставки).Его можно расширить для работы с пулом потоков, создав отдельную очередь переполнения для каждого потока.

int taskRead, taskWrite;

// These variables are only allowed to be modified by the worker thread
std::__thread_id workerId;
std::queue<std::function<void(void)>> overflow;
bool overflowInUse;

void setup() {
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Make the pipe non-blocking to check pipe overflows manually
    ::fcntl(taskWrite, F_SETFL, ::fcntl(taskWrite, F_GETFL, 0) | O_NONBLOCK);

    // Save the ID of this worker thread to compare later
    workerId = std::this_thread::get_id();
    overflowInUse = false;

    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task
        (*taskPtr)();
        delete taskPtr;

        // Run any tasks that were posted to the overflow
        while (!overflow.empty()) {
            taskPtr = overflow.front();
            overflow.pop();

            (*taskPtr)();
            delete taskPtr;
        }

        // Release the overflow mechanism if applicable
        overflowInUse = false;
    });
}

bool write(std::function<void(void)>* taskPtr, bool blocking = true) {
    ssize_t rc = ::write(taskWrite, &taskPtr, sizeof(taskPtr));

    // Failure handling
    if (rc < 0) {
        // If blocking is allowed, wait for pipe to become available
        int err = errno;
        if ((errno == EAGAIN || errno == EWOULDBLOCK) && blocking) {
            fd_set fds;
            FD_ZERO(&fds);
            FD_SET(taskWrite, &fds);

            ::pselect(1, nullptr, &fds, nullptr, nullptr, nullptr);

            // Try again
            return write(tdata);
        }

        // Otherwise return false
        return false;
    }

    return true;
}

void post(const std::function<void(void)>& task) {
    auto* taskPtr = new std::function<void(void)>(task);

    if (std::this_thread::get_id() == workerId) {
        // The worker thread gets 1st-class treatment.
        // It won't be blocked if the pipe is full, instead
        // using an overflow queue until the overflow has been cleared.
        if (!overflowInUse) {
            bool success = write(taskPtr, false);
            if (!success) {
                overflow.push(taskPtr);
                overflowInUse = true;
            }
        } else {
            overflow.push(taskPtr);
        }
    } else {
        write(taskPtr);
    }
}
0 голосов
/ 31 января 2019

Вы можете использовать старый добрый выберите , чтобы определить, готовы ли файловые дескрипторы для использования для записи:

Будут отслеживаться файловые дескрипторы в writefds, чтобы увидеть,пространство доступно для записи (хотя большая запись все еще может блокироваться).

Поскольку вы пишете указатель, ваш write() вообще не может быть классифицирован как большой.

Ясновы должны быть готовы справиться с тем фактом, что сообщение может потерпеть неудачу, и затем быть готовым повторить его позже ... в противном случае вы будете сталкиваться с бесконечно растущими трубами, пока ваша система не сломается снова.

Более или менее (не проверено):

bool post(const std::function<void(void)>& task) {
    bool post_res = false;

    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    fd_set wfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&wfds);
    FD_SET(taskWrite, &wfds);

    // Don't wait at all
    tv.tv_sec = 0;
    tv.tv_usec = 0;

    retval = select(1, NULL, &wfds, NULL, &tv);
    // select() returns 0 when no FD's are ready
    if (retval == -1) {
      // handle error condition
    } else if (retval > 0) {
      // Write the pointer to the pipe. This write will succeed
      ::write(taskWrite, &taskPtr, sizeof(taskPtr));
      post_res = true;
    }
    return post_res;
}
...