Как зафиксировать UoW после вызова всех обработчиков - PullRequest
0 голосов
/ 10 января 2019

У меня есть сценарий, в котором все обработчики на одном узле должны работать на одной «единице работы», которая фиксируется после вызова всех обработчиков. Я думаю, что лучший способ сделать следующее:

Когда сообщение получено, выполните следующие действия как часть конвейера:

  1. Создать новый экземпляр DbContext (UoW)
  2. Вызывает обработчики и передает экземпляр DbContext
  3. Если все обработчики вызываются без ошибок, вызовите DbContext.SaveChanges
  4. Утилизировать DbContext

Можете ли вы дать мне подсказку о том, как настроить конвейер Rebus для соответствия вышеуказанным требованиям?

EDIT:

Я закончил с этим:

private static IBus _bus;

public void ConfigureServices(IServiceCollection services)
{
    services.AddDbContext<MyDbContext>();
    services.AddMvc();
    services.AddTransient<IBus>(sp => _bus);
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
   StartRebus(app);
   ...
}

public static void StartRebus(this IApplicationBuilder app)
{
    var rebusServices = new ServiceCollection();
    rebusServices.AutoRegisterHandlersFromAssemblyOf<ActivityDenormalizer>();
    rebusServices.AddTransient<MyDbContext>(sp =>
    {
        var messageContext = MessageContext.Current
            ?? throw new InvalidOperationException("MessageContext.Current is null.");

        return messageContext.TransactionContext.Items
            .GetOrThrow<MyDbContext>(nameof(MyDbContext));
    });


    rebusServices.AddRebus((configure, sp) => configure
        .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "Messages"))
        .Options(o =>
        {
            o.EnableUnitOfWork<MyDbContext>(
                unitOfWorkFactoryMethod: messageContext =>
                {
                    //create new dbcontext instance regardless of ServiceLifeTime.
                    //Notice that I'm using ApplicationServices here, not RebusServices.
                    var dbContext = ActivatorUtilities.CreateInstance<MyDbContext>(app.ApplicationServices);
                    messageContext.TransactionContext.Items[nameof(MyDbContext)] = dbContext;
                    return dbContext;
                },
                commitAction: (messageContext, dbContext) => dbContext.SaveChanges(),
                cleanupAction: (messageContext, dbContext) => dbContext.Dispose());

        }));

    var rebusServiceProvider = rebusServices.BuildServiceProvider();
    rebusServiceProvider.UseRebus();
    _bus = rebusServiceProvider.GetRequiredService<IBus>();
}

прикладные сервисы и сервисы rebus взаимосвязаны в двух местах:

  1. IBus разрешается rebusServiceProvider, но экземпляр регистрируется также в службах приложений, поэтому я могу отправлять ему сообщения из своего приложения.

  2. Зависимости MyDbContext (DbContextOptions и т. Д.) Разрешаются ApplicationServices, но DbContext также создается в Rebus 'unitOfWorkFactoryMethod и регистрируется в rebusServices, чтобы его можно было вводить в обработчики Rebus.

1 Ответ

0 голосов
/ 10 января 2019

Да - проверить Rebus.UnitOfWork - в нем есть нужные вам крючки.

В частности, для Entity Framework вы должны сделать что-то вроде этого:

Configure.With(new CastleWindsorContainerAdapter(container))
    .Transport(t => t.UseMsmq("test"))
    .Options(o => o.EnableUnitOfWork(
        async context => new CustomUnitOfWork(context, connectionString),
        commitAction: async (context, uow) => await uow.Commit()
    ))
    .Start();

где CustomUnitOfWork будет выглядеть примерно так:

class CustomUnitOfWork
{
    public const string ItemsKey = "current-db-context";

    readonly MyDbContext _dbContext;

    public CustomUnitOfWork(IMessageContext messageContext, string connectionString)
    {
        _dbContext = new MyDbContext(connectionString);
        messageContext.TransactionContext.Items[ItemsKey] = this;
    }

    public async Task Commit()
    {
        await _dbContext.SaveChangesAsync();
    }

    public MyDbContext GetDbContext() => _dbContext;
}

и затем вы настроите свой контейнер IoC для разрешения MyDbContext, извлекая его из текущего контекста сообщения - с помощью Castle Windsor это будет сделано так:

container.Register(
    Component.For<CustomUnitOfWork>()
        .UsingFactoryMethod(k =>
        {
            var messageContext = MessageContext.Current
                ?? throw new InvalidOperationException("Can't inject uow outside of Rebus handler");

            return messageContext.TransactionContext.Items
                .GetOrThrow<CustomUnitOfWork>(CustomUnitOfWork.ItemsKey);
        })
        .LifestyleTransient(),

    Component.For<MyDbContext>()
        .UsingFactoryMethod(k => k.Resolve<CustomUnitOfWork>().GetDbContext())
        .LifestyleTransient()
);
...