UnitOfWork и DbContext: безопасность потоков с DI - PullRequest
0 голосов
/ 04 апреля 2019

Я работаю над консольным приложением .NET Core 2.2, в котором размещено IHostedService:

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IPositionService positionService)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.positionService = positionService;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        mqttClient = new MqttFactory().CreateMqttClient();
        mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
        mqttClient.ApplicationMessageReceived +=
            async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);

        await mqttClient.ConnectAsync(
            new MqttClientOptionsBuilder()
                .WithTcpServer(config.Value.Host, config.Value.Port).Build());
    }

    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        await positionService.HandleMessage(message);
    }

    [...]
}

Это IPositionService менеджер, который проверяет сообщение и проверяет, можно ли его сохранить в нашей базе данных:

public class PositionService : IPositionService
{
    [...]

    public PositionService(
        IUnitOfWork unitOfWork, ILogger<PositionService> logger)
    {
        this.unitOfWork = unitOfWork;
        this.logger = logger;
    }

    public async Task HandleMessage(string message)
    {
        Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);

        [...]

        await unitOfWork.EntityRepository.UpdateAsync(entity);
        await unitOfWork.Save();
    }

    [...]
}

IUnitOfWork - оболочка для ядра Entity Framework DbContext (пожалуйста, не судите меня, у меня есть причины для этого):

public class UnitOfWork : IUnitOfWork
{
    [...]

    public UnitOfWork(MyContext myContext)
    {
        this.myContext = myContext;

        EntityRepository = new EFRepository<Entity>(myContext);
    }

    public async Task Save()
    {
        await myContext.SaveChangesAsync();
    }
}

EFRepository<T>, который реализует интерфейс IRepository<T>, является оберткой вокруг DbSet<T> (опять же, пожалуйста, не судите меня). Нет соответствующего кода здесь.

Файл Program.cs консольного приложения настроен так:

[...]
.ConfigureServices((hostContext, services) =>
{
    services.AddDbContext<MyContext>(
        c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
        ServiceLifetime.Transient);

    services.AddTransient<IPositionService, PositionService>();
    services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
    services.AddTransient<IUnitOfWork, UnitOfWork>();

    services.AddHostedService<MqttClientHostedService>();

    [...]
});

Проблема в том, что PositionService.HandleMessage вызывается много раз в секунду, и что DbContext не является поточно-ориентированным, я получаю это сообщение об ошибке:

Вторая операция началась в этом контексте перед предыдущей операцией завершено.

Я решил эту проблему, удалив IUnitOfWork из зависимостей PositionService, вставив вместо него IServiceScopeFactory и выполнив:

using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
    IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();

    [...]
}

Этот способ работает, но мне это не нравится. Это похоже на трюк, и мне не нравится тот факт, что мой PositionService знает о Dependency Injection и имеет дело с областями действия.

Мой вопрос: есть лучший способ решить эту проблему, не касаясь моих занятий? Должен ли я сделать весь поток UnitOfWork безопасным? Или, может быть, создать его вручную без использования DI?

1 Ответ

2 голосов
/ 04 апреля 2019

Источник проблемы заключается в том, что MyContext удерживается в плену в качестве зависимой зависимости в следующем графе объектов:

MqttClientHostedService
    -> PositionService
        -> UnitOfWork
            -> MyContext 

Все типы на этом графике зарегистрированы как Transient, но, тем не менее, службы, которые действуют как размещенная служба (например, MqttClientHostedService), разрешаются только один раз на время работы приложения и кэшируются на неопределенный срок. , Это делает их единственными.

Другими словами, MyContext случайно поддерживается живым одним MqttClientHostedService, и, поскольку несколько сообщений могут поступать параллельно, у вас есть состояние гонки.

Решение состоит в том, чтобы каждое событие ApplicationMessageReceived выполнялось в своем собственном уникальном маленьком пузырьке (области действия) и разрешало новый IPositionService из этого пузыря. Например:

public class MqttClientHostedService : IHostedService, IDisposable
{
    [...]
    public MqttClientHostedService(
        ILogger<MqttClientHostedService> logger, 
        IOptions<MqttClientConfiguration> mqttConfiguration, 
        IServiceProvider provider)
    {
        this.logger = logger;
        this.config = mqttConfiguration;
        this.provider = provider;
    }
    [...]
    private async Task MqttClient_ApplicationMessageReceived(
        object sender, MqttApplicationMessageReceivedEventArgs e)
    {
        using (var scope = provider.CreateScope())
        {
            positionService = scope.ServiceProvider
                .GetRequiredService<IPositionService>();
            string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            await positionService.HandleMessage(message);
        }
    }

    [...]
}
...