Я пытаюсь реализовать ситуацию, аналогичную продюсеру и потребителю, в моем драйвере уровня блока (в ядре Linux версии 2.6.39.1). make_request_fn
моего блочного драйвера получает поток struct bio
из приложения уровня пользователя. После получения этих БИО они стоят в очереди. Затем я создаю новый struct bio
, в котором будет храниться вся информация, присутствующая в БИО в очереди. Это новое «merged_bio» будет отправлено только в том случае, если в request_queue
драйвера нижнего уровня доступен слот struct request
. Тем временем мой блочный драйвер будет продолжать получать BIO из приложения уровня пользователя и помещать их в очередь (ситуация с плотной загрузкой). Теперь, чтобы уменьшить задержку, я хочу обеспечить, чтобы по мере того, как и когда BIO помещались в очередь, пакет, состоящий из самых старых BIO, исключался из очереди, тем самым минимизируя время простоя в очереди. Я не совсем понимаю, как мне добиться этой конвейерной ситуации (с низкой задержкой на запрос), когда мой драйвер make_request_fn
является производителем BIO, а функция, вызывающая submit_bio()
, является потребителем этих BIO. Возможные подходы включают в себя:
Тасклеты - Пусть тасклет непрерывно потребляет BIO из очереди, как только слот struct request
становится свободным в request_queue
. Этот подход не будет работать, потому что функция обработчика тасклетов не является атомарной, так как она вызывает submit_bio()
, что, в свою очередь, вызывает schedule()
.
Рабочие очереди - они могут иметь большую задержку, чем тасклеты, поскольку функция-обработчик для рабочей очереди может спать. Это может повлиять на производительность моего драйвера, поскольку BIO могут быть отправлены в драйвер более низкого уровня на намного позже, чем когда он был на самом деле поставлен в очередь make_request_fn
. Я не уверен, насколько сильно это повлияет на производительность (в моем драйвере реализовано устройство быстрой регистрации).
Другая проблема с рабочими очередями заключается в том, как и когда планировать рабочую очередь? merged_bio
необходимо отправить после того, как слот запроса станет доступен. Таким образом, должен существовать какой-то механизм «сигнализации», который будет планировать рабочую очередь, как только станет доступен struct request
. Я не вижу, как можно непрерывно отслеживать request_queue
для свободного слота без механизма сигнализации или poll()
, а затем явно планировать рабочую очередь. И да, мы не можем вызвать submit_bio()
из функции обратного вызова ранее завершенной BIO.
Потоки ядра - Я рассматривал это как третий вариант. Я не имею большого представления о потоках ядра, но вот как я планировал это сделать: мой драйвер make_request_fn
продолжал бы ставить в очередь BIO. Будет создан новый поток ядра, который будет непрерывно потреблять BIO из очереди только , когда станет доступен слот struct request
(и не иначе). Таким образом, каждый раз, когда этот поток ядра запланирован, он проверяет наличие пустого слота запроса, а затем использует пакет BIO из очереди и вызывает submit_bio()
.
Что-то умнее ??
Могут ли члены stackoverflow помочь мне выбрать разумный и эффективный способ реализации этого сценария? Спасибо!
ОБНОВЛЕНИЕ : я попробовал метод workqueue, но он просто привел к сбою моего ядра (/ var / log / messages содержал искаженный текст, поэтому у меня нет журналов для обмена). Вот как я реализовал рабочую очередь:
Структура данных, которая будет использоваться рабочей очередью:
struct my_work_struct {
struct work_struct wk;
pitdev_t *pd; /* Pointer to my block device */
int subdev_index; /* Indexes the disk that is currently in picture -- pd->subdev[subdev_index] */
};
struct pitdev_struct {
/* Driver related data */
struct my_work_struct *work;
} *pd;
typedef struct pitdev_struct pitdev_t;
Инициализировать мой рабочий элемент:
/* Allocate memory for both pd and pd->work */
INIT_WORK(&pd->work->wk, my_work_fn);
pd->work->pd = pd;
pd->work->subdev_index = 0;
Определение моей рабочей функции:
void my_work_fn(struct work_struct *work)
{
struct my_work_struct *temp = container_of(work, struct my_work_struct, wk);
pitdev_t *pd = temp->pd;
int sub_index = temp->subdev_index;
/* Create a BIO and submit it*/
submit_bio(WRITE, merged_bio);
}
В merged_bio->bi_end_io
я планирую свой рабочий элемент:
schedule_work(&pd->work->wk);
Это делается для того, чтобы следующий BIO, подлежащий отправке, планировался вскоре после того, как предыдущий BIO был успешно перенесен. Первый вызов submit_bio()
выполняется без с использованием рабочей очереди. Этот первый вызов submit_bio()
происходит без проблем; при вызове я вызываю submit_bio()
с помощью рабочего элемента, происходит сбой системы.
Есть идеи?