Сбой маршрутизатора NServiceBus между двумя транспортами SQL - PullRequest
1 голос
/ 10 июля 2019

У меня есть два бизнес-домена (DomainA и DomainB), которые используют свой собственный SQL-транспорт в отдельных базах данных (отдельных серверах) для своих enpdpoints NServiceBus.

Я понял, что вы можете отправлять / публиковать сообщения из одногодомен к другому, подключив их через роутер.Официальная документация дает понять, что это возможно .В целях воспроизведения я взял обновление и опубликовал образец и изменил его так, чтобы обе конечные точки использовали транспорт SQL Server + постоянство SQL.

Цель состоит в том, чтобы опубликовать сообщение из DomainA и получить егообрабатывается в DomainB.

DomainA (веб-приложение) - нет конфигурации маршрутизатора, поскольку оно публикует только что-то для демонстрации:

var endpointConfiguration = new EndpointConfiguration("DomainA-Endpoint");

var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainA);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(ConnectionStrings.DomainA);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();

DomainB - подключается к маршрутизатору для опубликованного события:

var endpointConfiguration = new EndpointConfiguration("DomainB-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainB);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(ConnectionStrings.DomainB);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();

var routerConnector = transport.Routing().ConnectToRouter("DomainA-B-Router");
routerConnector.RegisterPublisher(
    eventType: typeof(OrderAccepted),
    publisherEndpointName: "DomainA-Endpoint");

И маршрутизатор:

var routerConfig = new RouterConfiguration("DomainA-B-Router");

var domainAInterface = routerConfig.AddInterface<SqlServerTransport>("DomainA", t =>
{
    t.ConnectionString(ConnectionStrings.DomainA);
    t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainASqlSubscriptionStorage = new SqlSubscriptionStorage(
    () => new SqlConnection(ConnectionStrings.Router),
    "DomainA-", new SqlDialect.MsSqlServer(), null);
domainAInterface.EnableMessageDrivenPublishSubscribe(domainASqlSubscriptionStorage);

var domainBInterface = routerConfig.AddInterface<SqlServerTransport>("DomainB", t =>
{
    t.ConnectionString(ConnectionStrings.DomainB);
    t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainBSqlSubscriptionStorage = new SqlSubscriptionStorage(
    () => new SqlConnection(ConnectionStrings.Router),
    "DomainB-", new SqlDialect.MsSqlServer(), null);
domainBInterface.EnableMessageDrivenPublishSubscribe(domainBSqlSubscriptionStorage);

var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("DomainA", "DomainB");
staticRouting.AddForwardRoute("DomainB", "DomainA");

domainASqlSubscriptionStorage.Install().GetAwaiter().GetResult();
domainBSqlSubscriptionStorage.Install().GetAwaiter().GetResult();

routerConfig.AutoCreateQueues();

Код для запуска конечных точек и маршрутизатор опущены для краткости.

Строки подключения приведены ниже - используйте другой экземпляр SQL для DomainAи DomainB:

public class ConnectionStrings
{
    public const string DomainA = @"Data Source=(local);Initial Catalog=Nsb-DomainA-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
    public const string DomainB = @"Data Source=(localDB)\MSSQLLocalDB;Initial Catalog=Nsb-DomainB-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
    public const string Router = @"Data Source=(local);Initial Catalog=Nsb-DomainA-B-Router-DB;Integrated Security=True;Max Pool Size=100";
}

Когда я запускаю образец, я получаю следующую ошибку в маршрутизаторе:

2019-07-10 11:46:08.889 WARN  RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router is now in the armed state
2019-07-10 11:46:15.955 WARN  RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router will now be triggered
2019-07-10 11:46:15.991 ERROR ThrottlingRawEndpointConfig`1[[NServiceBus.SqlServerTransport, NServiceBus.Transport.SqlServer, Version=4.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Persistent error while processing messages in DomainA-B-Router. Entering throttled mode.
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to [Nsb-DomainA-Endpoint-DB].[dbo].[DomainA-Endpoint] ---> System.Data.SqlClient.SqlException: Invalid object name 'Nsb-DomainA-Endpoint-DB.dbo.DomainA-Endpoint'.
   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
   at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString, Boolean isInternal, Boolean forDescribeParameterEncryption, Boolean shouldCacheForAlwaysEncrypted)
   at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader(Boolean isInternal, Boolean forDescribeParameterEncryption)
   at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
   at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
   --- End of inner exception stack trace ---
   at NServiceBus.Transport.SQLServer.TableBasedQueue.ThrowQueueNotFoundException(SqlException ex)
   at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchUsingReceiveTransaction>d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchAsNonIsolated>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.MessageDispatcher.<Dispatch>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at PostroutingTerminator.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at ForwardSubscribeMessageDrivenRule.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at SubscribePreroutingTerminator.<Terminate>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at StorageDrivenSubscriptionRule.<Invoke>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at PreroutingToSubscribePreroutingFork.<Terminate>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at ThrottlingRawEndpointConfig`1.<>c__DisplayClass1_0.<<PrepareConfig>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.ReceiveStrategy.<TryProcessingMessage>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.SQLServer.ProcessWithNativeTransaction.<TryProcess>d__3.MoveNext()

Мы видим, что маршрутизатор пытается получить доступ к базе данных DomainA,и при отладке мы видим, что это происходит из соединения DomainB, которое не может работать, поскольку они находятся на другом сервере.Если я изменяю строку подключения DomainB, чтобы она указывала на один и тот же экземпляр SQL, все работает нормально (при условии, что один и тот же пользователь имеет доступ ко всем БД).

Я думал, что роль маршрутизатора заключается в перемещении сообщений междуслучаи, но я не могу этого достичь.Я что-то не так делаю?

Почему маршрутизатор и конечные точки должны использовать одно и то же соединение для данных подписки?

Спасибо за помощь!

Полный код доступен здесь .

...