При попытке обработать несколько сообщений в быстрой последовательности, используя MassTransit RabbitMQ и StructureMap, я получаю сообщение об ошибке:
Обнаружены двунаправленные зависимости!
Это происходит только для потребителей, ошибка выглядит следующим образом:
MT-Fault-ExceptionType: StructureMap.Building.StructureMapBuildException
MT-Fault-Message: Обнаружена двунаправленная зависимость!
Проверьте трассировку стека StructureMap ниже:
1.) Экземпляр Acme.Co.WorkerService.Consumers.SomeCommandConsumer
MT-Fault-StackTrace: at lambda_method (Закрытие, IBuildSession, IContext)
в StructureMap.Building.BuildPlan.Build (сеанс IBuildSession, контекст IContext)
at StructureMap.SessionCache.GetObject (Тип pluginType, Экземпляр экземпляра, жизненный цикл ILifecycle)
в StructureMap.SessionCache.GetDefault (Тип pluginType, IPipelineGraph pipelineGraph)
в StructureMap.BuildSession.GetInstanceT
в MassTransit.Pipeline.ConsumerFactories.DelegateConsumerFactory 1.<Send>d__2
1.MoveNext ()
в MassTransit.Pipeline.Filters.ConsumerMessageFilter 2.<GreenPipes-IFilter<MassTransit-ConsumeContext<TMessage>>-Send>d__4.MoveNext()
at MassTransit.Pipeline.Filters.ConsumerMessageFilter
2.> - Отправить> d__4.MoveNext ()
в GreenPipes.Filters.TeeFilter 1.<Send>d__5.MoveNext()
at GreenPipes.Filters.OutputPipeFilter
2.d__7.MoveNext ()
в GreenPipes.Filters.OutputPipeFilter 2.<SendToOutput>d__7.MoveNext()
at GreenPipes.Filters.DynamicFilter
1.d__9.MoveNext ()
в MassTransit.Pipeline.Filters.DeserializeFilter.d__4.MoveNext ()
в GreenPipes.Filters.RescueFilter`2.-Send> d__5.MoveNext ()
Я не вижу циклических или двунаправленных зависимостей, но я могу что-то упустить.
Код для Потребителя Команды (по существу):
public class SomeCommandConsumer : IConsumer<ISomeCommand>
{
private readonly IRepository _repository;
private readonly IAnotherRepository _anotherRepository;
public SomeCommandConsumer(IRepository repository, IAnotherRepository anotherRepository)
{
_repository = repository;
_anotherRepository = anotherRepository;
}
public async Task Consume(ConsumeContext<ISomeCommand> context)
{
try
{
await DoSomeWork(context);
await context.Publish<ISomeProcessCompleteEvent>(new
{
context.Message.ID,
SomeProperty = SomeEnum.SomeValue
});
}
catch (Exception e)
{
HandleException(e);
}
}
}
И регистрация StructureMap эффективно:
public class WorkflowServiceRegistry : Registry
{
public WorkflowServiceRegistry()
{
var configuration = FabricRuntime.GetActivationContext().GetConfigurationPackageObject(Constants.ServiceFabricConfigurationSection);
var baseUri = configuration.Settings.Sections[Constants.ConfigurationRabbitMqConfigSection].Parameters[Constants.ConfigurationRabbitMqBaseUri].Value;
var username = configuration.Settings.Sections[Constants.ConfigurationRabbitMqConfigSection].Parameters[Constants.ConfigurationRabbitMqUsername].Value;
var password = configuration.Settings.Sections[Constants.ConfigurationRabbitMqConfigSection].Parameters[Constants.ConfigurationRabbitMqPassword].Value;
var connectionString = configuration.Settings.Sections[Constants.ConfigurationDatabaseSection].Parameters[Constants.ConfigurationDatabaseConnectionString].Value;
var dbTimeout = int.Parse(configuration.Settings.Sections[Constants.ConfigurationDatabaseSection].Parameters[Constants.ConfigurationDatabaseTimeout].Value);
For<IAnotherRepository>().Use<AnotherRepository>();
For<IRepository>()
.Use<Repository>()
.Ctor<string>()
.Is(connectionString);
ForConcreteType<SomethingCompleteEventConsumer>()
.Configure
.Ctor<string>("baseUri")
.Is(baseUri);
ForConcreteType<SomeCommandConsumer>();
ForConcreteType<SomethingCompleteEventConsumeObserver>();
For<IBusControl>().Use(container => BusConfigurator.ConfigureBus(
baseUri,
username,
password,
(cfg, host) => cfg.ReceiveEndpoint(host, Constants.ControllerSerivceAnalysisProcessStartQueue, SomeCommandQueueConfiguration(container)),
(cfg, host) => cfg.ReceiveEndpoint(host, Constants.ControllerSerivceActionServiceCompleteQueue, SomethingCompleteEventQueueConfiguration(container))
));
For<ICommunicationListener>().Use<MassTransitListener>();
}
private static Action<IRabbitMqReceiveEndpointConfigurator> SomeCommandQueueConfiguration(IContext container)
{
return e =>
{
e.Consumer(container.GetInstance<SomeCommandConsumer>);
};
}
private static Action<IRabbitMqReceiveEndpointConfigurator> SomethingCompleteEventQueueConfiguration(IContext container)
{
return e =>
{
e.Consumer(container.GetInstance<SomethingCompleteEventConsumer>);
e.Observer(container.GetInstance<SomethingCompleteEventConsumeObserver>());
};
}
}
Просто интересно, это кто-то видел раньше?
Обратите внимание, что код является частью решения сервисной фабрики, и у каждой службы есть свой аналогичный "загрузчик" или регистрация для StructureMap, а также, для краткости, часть кода отсутствует.