Потоки ресурсоемки для создания и использования, поэтому часто пул потоков используется повторно для асинхронных задач.Задача упаковывается, а затем «публикуется» в брокере, который поставит задачу в очередь в следующем доступном потоке.
Это идея, стоящая за очередями отправки (т. Е. Apple Central Central Dispatch) и обработчиками потоков (Android-механизм Looper).
Прямо сейчас я пытаюсь свернуть свой собственный.На самом деле, я закрываю пробел в Android, в котором есть API для публикации задач в Java, но не в нативном NDK.Тем не менее, я держу этот вопрос независимо от того, где могу.
Трубы - идеальный выбор для моего сценария.Я могу легко опрашивать файловый дескриптор конца чтения pipe (2) в моем рабочем потоке и ставить задачи из любого другого потока путем записи в конец записи.Вот как это выглядит:
int taskRead, taskWrite;
void setup() {
// Create the pipe
int taskPipe[2];
::pipe(taskPipe);
taskRead = taskPipe[0];
taskWrite = taskPipe[1];
// Set up a routine that is called when task_r reports new data
function_that_polls_file_descriptor(taskRead, []() {
// Read the callback data
std::function<void(void)>* taskPtr;
::read(taskRead, &taskPtr, sizeof(taskPtr));
// Run the task - this is unsafe! See below.
(*taskPtr)();
// Clean up
delete taskPtr;
});
}
void post(const std::function<void(void)>& task) {
// Copy the function onto the heap
auto* taskPtr = new std::function<void(void)>(task);
// Write the pointer to the pipe - this may block if the FIFO is full!
::write(taskWrite, &taskPtr, sizeof(taskPtr));
}
Этот код помещает std::function
в кучу и передает указатель на канал.Затем function_that_polls_file_descriptor
вызывает предоставленное выражение, чтобы прочитать канал и выполнить функцию.Обратите внимание, что в этом примере нет проверок безопасности.
Это прекрасно работает в 99% случаев, но есть один существенный недостаток.Трубы имеют ограниченный размер, и если труба заполнена, вызовы post()
будут зависать.Само по себе это небезопасно, пока не будет выполнен вызов post()
в рамках задачи.
auto evil = []() {
// Post a new task back onto the queue
post({});
// Not enough new tasks, let's make more!
for (int i = 0; i < 3; i++) {
post({});
}
// Now for each time this task is posted, 4 more tasks will be added to the queue.
});
post(evil);
post(evil);
...
Если это произойдет, то рабочий поток будет заблокирован, ожидая записик трубе.Но FIFO канала заполнен, и рабочий поток ничего не читает из него, поэтому вся система находится в тупике.
Что можно сделать, чтобы гарантировать, что вызовы post()
исходят из рабочего потока всегда успешно, что позволяет работнику продолжить обработку очереди в случае ее заполнения?