Несколько потоков, читающих уникальные записи из одной таблицы с Entity Framework - PullRequest
0 голосов
/ 20 июня 2019

У нас есть несколько параллельных потоков, которые одновременно "запрашивают" необработанные записи из таблицы базы данных для работы. Чтобы гарантировать, что ни один из потоков не получит повторяющиеся записи, мы используем запрос, который выглядит примерно так:

WITH UpdateView AS
(
    SELECT TOP 1 X, Y, Z, Processed
    FROM MyTable
    WHERE Processed = 0
)
UPDATE UpdateView
SET Processed = 1
OUTPUT INSERTED.X, INSERTED.Y, INSERTED.Z, INSERTED.Processed

У меня вопрос: есть ли эквивалентный способ сделать что-то подобное через Entity Framework 6.0+, который сравнительно эффективен, или это не очень хороший вариант использования для EF?

Обновление

Итак, то, что я ищу, будет примерно эквивалентно этому:

IEnumerable<MyTable> results = context.MyTable
    .Where(r => !r.Processed)
    .Take(1) // Up to this point, it effectively builds the UPdateView portion of the original query but thanks to deferred execution, no data is actually retrieved yet.
    .UpdateAndReturn(context, r => r.Processed = true); // Hypothetical extension method that would perform the update and retrieve the updated record(s) as an atomic operation.

К сожалению, я не знаю, как через EF добавить операцию обновления в запрос linq отложенного выполнения.

1 Ответ

1 голос
/ 21 июня 2019

Акт резервирования записи должен быть упорядочен одним контекстом в одном потоке.Например, учитывая несколько рабочих потоков, каждый из которых имеет свой собственный DbContext, вы столкнетесь с ситуацией, в которой вы оказались, когда каждый рабочий мог в конечном итоге одновременно запросить Processed = 0 и получить перекрытия.

Вместо этого я хотел бы рассмотреть возможность добавления столбца ProcessorId к записям и привязки идентификатора к каждому из ваших рабочих потоков.Каждый рабочий поток будет запрашивать, где ProcessorId = MyProcessorId && Processed == 0. Когда у работника нет необработанных записей, он вызывает маршалированный отдельный поток с идентификатором процессора, который просматривает ProcessorId IS NULL и Processed == 0, а затем назначает одну или несколько строкна идентификатор процессора и возвращает.В зависимости от объема обработки вы можете распределять их по одному за раз или пакетами по 10/50/100 и т. Д.

Обновление: вы можете зарезервировать запись в поточно-ориентированном режиме с помощью транзакциичтобы заблокировать записи таблицы достаточно долго, чтобы выбрать одну и установить ее флаг Обработано.Если запись «плоская» без каких-либо ссылок и т. Д., О которых следует беспокоиться, вы можете отсоединить ее после установки флажка, чтобы связать ее с контекстом, не относящимся к tx, где вы будете выполнять обработку и т. Д., Не блокируя таблицу.В противном случае просто возьмите идентификатор записи и, при необходимости, перезагрузите его после завершения блокировки Tx.

Т.е.

UpdateView view = null;
using(var context = new MyContext())
{
    using (var tx = context.Database.BeginTransaction())
    {
        view = context.UpdateViews
        .Where(x => !x.IsProcessed)
        .OrderBy(x => x.CreatedDate)
        .FirstOrDefault();
        if (view != null)
        {
            view.IsProcessed = true;
            context.SaveChanges();
            context.Detach(view);
        }
        tx.Commit();
    }
}
if(view == null)
    return;

using(var context = new MyContext())
{
    context.UpdateViews.Attach(view);
    // continue processing...
}

Вам может не понадобиться 2x контексты, просто используйте его внеобласть Tx с тем же контекстом должна быть в порядке, но я просто поместил эту опцию на всякий случай.Если вы хотите загружать дочерние или связанные сущности, то я бы оставил резервирование прочитанным только в записи без загрузки, установил флаг обработки и взял бы идентификатор, затем, после закрытия Tx, снова загрузил бы сущность / wнетерпеливая загрузка.Это позволит максимально сократить период блокировки между чтением и фиксацией.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...