У меня кластер rabbitmq на k8s. У меня есть приложение, которое использует MassTransit с rabbitmq. Когда я запускаю приложение на k8s при запуске, MassTransit пишет об ошибке. Я уверен, что rabbitmq достижим, потому что я также добавил проверку здоровья rabbitmq, которая возвращает здоровье. Я поделился также сервисом yaml и информацией о хосте.
Также я могу видеть, что обмен идентификационными данными создается из пользовательского интерфейса управления rabbitmq
Startup.cs
var rabbitConfig = new RabbitMqConfig();
rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");
services.AddSingleton(rabbitConfig);
var factory = new ConnectionFactory()
{
HostName = rabbitConfig.Host,
Password = rabbitConfig.Password,
UserName = rabbitConfig.UserName,
VirtualHost = "/",
Port = rabbitConfig.Port,
AutomaticRecoveryEnabled = true
};
var connection = factory.CreateConnection();
services.AddSingleton<IConnection>(connection);
string connectionString = Configuration.GetValue<string>("CONNECTION_STRING");
services.AddDbContext<DataContext>(options =>
{
options.UseMySql(connectionString);
});
services.AddHealthChecks()
.AddMySql(connectionString, "MySQL")
.AddRabbitMQ(name: "Rabbit");
services.AddMassTransit( x=> {
x.AddConsumers(typeof(Startup).Assembly);
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
{
h.Username(rabbitConfig.UserName);
h.Password(rabbitConfig.Password);
};
cfg.Host(new Uri($"rabbitmq://{rabbitConfig.Host}"),"/", configure);
cfg.ReceiveEndpoint("identity", e=> {
e.UseMessageRetry(x => x.Interval(2, 100));
});
}));
});
services.AddSingleton<IHostedService, BusService>();
Service.yaml
kind: Service
apiVersion: v1
metadata:
namespace: rabbitmq
name: rabbitmq
labels:
app: rabbitmq
spec:
selector:
app: rabbitmq
ports:
- name: rabbitmq-mgmt-port
protocol: TCP
port: 15672
targetPort: 15672
- name: rabbitmq-amqp-port
protocol: TCP
port: 5672
targetPort: 5672
хост:
rabbitmq.rabbitmq.svc.cluster.local
Когда я развертываю свое приложение в кластере k8s, MassTransit пишет ошибку, как показано ниже.
MassTransit [0] Операция прервана: RabbitMQ.Client.Exceptions.OperationInterruptedException: операция AMQP был прерван: причина закрытия AMQP, инициированная Peer, код = 541, текст = 'INTERNAL_ERROR', classId = 0, methodId = 0 в RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply (время ожидания TimeSpan) в RabbitMQ.Client.Imp. ModelBase.QueueDeclare (Строковая очередь, Булево пассивное, Булево долговременное, Булево исключительное, Булево автоматическое удаление, IDictionary 2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary
2 аргумента) в MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext. <> C__DisplayClass19Tk.tas_t___ .___ .__ 1.InnerInvoke()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
--- End of stack trace from previous location where exception was thrown ---
at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.ConfigureTopology (контекст ModelContext) в MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter 1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func
2 se tupMethod, PayloadFactory 1 payloadFactory)
at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter
1.GreenPipes.IFilter.Send (контекст ModelContext, IPipe 1 next)
at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe
1 следующий) в MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send (контекст ConnectionContext, 1.Green 10. IPipeContextSource.Send (IPipe 1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send (IPipe 1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor
1.GreenPipes.IPipeContextSource.Send (канал IPipe`1, CancellationToken cancellationToken) в MassTransit.RabbitMqTransport.Transport.Transport.Transport. *