Использование таблицы базы данных в качестве очереди - PullRequest
40 голосов
/ 01 февраля 2010

Я хочу использовать таблицу базы данных в качестве очереди. Я хочу вставить в него и взять элементы из него в порядке вставки (FIFO). Мое главное соображение - производительность, потому что у меня тысячи транзакций каждую секунду. Поэтому я хочу использовать SQL-запрос, который дает мне первый элемент без поиска по всей таблице. Я не удаляю строку, когда читаю ее. SELECT TOP 1 ..... помогает здесь? Должен ли я использовать какие-либо специальные индексы?

Ответы [ 9 ]

31 голосов
/ 01 февраля 2010

Я бы использовал поле IDENTITY в качестве первичного ключа для предоставления уникально увеличивающегося идентификатора для каждого элемента в очереди и прикреплял к нему кластерный индекс. Это будет представлять порядок, в котором элементы были поставлены в очередь.

Чтобы сохранить элементы в таблице очередей во время их обработки, вам потребуется поле «статус», чтобы указать текущее состояние определенного элемента (например, 0 = ожидание, 1 = обрабатывается, 2 = обрабатывается). Это необходимо для предотвращения повторной обработки элемента.

При обработке элементов в очереди вам нужно будет найти следующий элемент в таблице, НЕ обрабатываемой в данный момент. Это должно быть сделано таким образом, чтобы предотвратить одновременное получение несколькими процессами одного и того же элемента для обработки одновременно, как показано ниже. Обратите внимание на табличные подсказки UPDLOCK и READPAST, о которых вы должны знать при реализации очередей.

например. в sproc, что-то вроде этого:

DECLARE @NextID INTEGER

BEGIN TRANSACTION

-- Find the next queued item that is waiting to be processed
SELECT TOP 1 @NextID = ID
FROM MyQueueTable WITH (UPDLOCK, READPAST)
WHERE StateField = 0
ORDER BY ID ASC

-- if we've found one, mark it as being processed
IF @NextId IS NOT NULL
    UPDATE MyQueueTable SET Status = 1 WHERE ID = @NextId

COMMIT TRANSACTION

-- If we've got an item from the queue, return to whatever is going to process it
IF @NextId IS NOT NULL
    SELECT * FROM MyQueueTable WHERE ID = @NextID

Если обработка элемента не удалась, хотите ли вы попробовать его позже? Если это так, вам нужно будет сбросить статус обратно на 0 или что-то еще. Это потребует больше размышлений.

В качестве альтернативы, не используйте таблицу базы данных в качестве очереди, а что-то вроде MSMQ - просто подумал, что я добавлю это в смесь!

7 голосов
/ 01 февраля 2010

Если вы не удалите обработанные строки, вам понадобится какой-нибудь флаг, который указывает, что строка уже обработана.

Поместите индекс на этот флаг, и на столбец, который вы собираетесь заказать.

Разделите вашу таблицу над этим флагом, чтобы отложенные транзакции не засоряли ваши запросы.

Если бы вы действительно получали 1.000 сообщений каждую секунду, это привело бы к 86.400.000 строкам в день. Возможно, вы захотите придумать способ очистить старые строки.

4 голосов
/ 01 февраля 2010

Все зависит от вашей базы данных двигателя / реализации.

Для меня простые очереди на таблицы со следующими столбцами:

id / task / priority / date_added

обычно работает.

Я использовал приоритет и задачу для группировки задач, а в случае двойной задачи я выбрал задачу с большим приоритетом.

И не волнуйтесь - для современных баз данных «тысячи» не представляют собой ничего особенного.

3 голосов
/ 01 февраля 2010

Это не будет проблемой, если вы используете что-то для отслеживания даты и времени вставки.Смотрите здесь для mysql опции .Вопрос в том, нужен ли вам когда-либо только абсолютный последний отправленный элемент или вам нужно выполнить итерацию.Если вам нужно выполнить итерацию, то вам нужно получить чанк с оператором ORDER BY, выполнить цикл и запомнить последний datetime , чтобы вы могли использовать его при получении следующего чанка.

2 голосов
/ 01 февраля 2010

Поскольку вы не удаляете записи из таблицы, вам нужно иметь составной индекс на (processed, id), где processed - это столбец, который указывает, была ли обработана текущая запись.

Лучше всего было бы создать секционированную таблицу для ваших записей и сделать поле PROCESSED ключом разделения. Таким образом, вы можете сохранить три или более локальных индекса.

Однако, если вы всегда обрабатываете записи в порядке id и имеете только два состояния, обновление записи означало бы просто взять запись с первого листа индекса и добавить ее к последнему листу

Текущая обработанная запись всегда будет иметь наименьшее id из всех необработанных записей и наибольшее id из всех обработанных записей.

2 голосов
/ 01 февраля 2010

Создание кластеризованного индекса по столбцу даты (или автоинкремента). Это сохранит строки в таблице примерно в порядке индекса и обеспечит быстрый доступ на основе индекса при ORDER BY индексированном столбце. Использование TOP X (или LIMIT X, в зависимости от вашей RDMBS) приведет к получению только первых x элементов из индекса.

Предупреждение о производительности: вы всегда должны просматривать планы выполнения ваших запросов (на реальных данных), чтобы убедиться, что оптимизатор не делает неожиданных вещей. Также попытайтесь сравнить свои запросы (опять же на реальных данных), чтобы иметь возможность принимать обоснованные решения.

2 голосов
/ 01 февраля 2010

возможно, добавление LIMIT = 1 к вашему оператору выбора поможет ... форсировать возврат после одного совпадения ...

1 голос
/ 08 января 2019

У меня был тот же общий вопрос «как превратить таблицу в очередь», и я не мог найти ответ, который хотел где-либо.

Вот что я придумал для Node / SQLite / better-sqlite3. В основном просто измените внутренние условия WHERE и ORDER BY для вашего варианта использования.

module.exports.pickBatchInstructions = (db, batchSize) => {
  const buf = crypto.randomBytes(8); // Create a unique batch identifier

  const q_pickBatch = `
    UPDATE
      instructions
    SET
      status = '${status.INSTRUCTION_INPROGRESS}',  
      run_id = '${buf.toString("hex")}',
      mdate = datetime(datetime(), 'localtime')
    WHERE
      id IN (SELECT id 
        FROM instructions 
        WHERE 
          status is not '${status.INSTRUCTION_COMPLETE}'
          and run_id is null
        ORDER BY
          length(targetpath), id
        LIMIT ${batchSize});
  `;
  db.run(q_pickBatch); // Change the status and set the run id

  const q_getInstructions = `
    SELECT
      *
    FROM
      instructions
    WHERE
      run_id = '${buf.toString("hex")}'
  `;
  const rows = db.all(q_getInstructions); // Get all rows with this batch id

  return rows;
};
0 голосов
/ 15 ноября 2012

Очень простое решение для того, чтобы не иметь транзакций, блокировок и т. Д., Состоит в использовании механизмов отслеживания изменений (не сбора данных).Он использует управление версиями для каждой добавленной / обновленной / удаленной строки, чтобы вы могли отслеживать, какие изменения произошли после определенной версии.

Итак, вы сохраняете последнюю версию и запрашиваете новые изменения.

Если запрос не удался, вы всегда можете вернуться назад и запросить данные из последней версии.Кроме того, если вы не хотите получать все изменения одним запросом, вы можете получить высший порядок n по последней версии и сохранить самую большую версию, которую я бы вам хотел запросить снова.

См. Это, например, Использование отслеживания изменений в SQL Server 2008

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...