Ошибка Masstransit при попытке подключить кластер rabbitmq на k8s - PullRequest
0 голосов
/ 29 февраля 2020

У меня кластер rabbitmq на k8s. У меня есть приложение, которое использует MassTransit с rabbitmq. Когда я запускаю приложение на k8s при запуске, MassTransit пишет об ошибке. Я уверен, что rabbitmq достижим, потому что я также добавил проверку здоровья rabbitmq, которая возвращает здоровье. Я поделился также сервисом yaml и информацией о хосте.

Также я могу видеть, что обмен идентификационными данными создается из пользовательского интерфейса управления rabbitmq

Startup.cs

        var rabbitConfig = new RabbitMqConfig();
        rabbitConfig.Host = Configuration.GetValue<string>("RABBIT_HOST");
        rabbitConfig.UserName = Configuration.GetValue<string>("RABBIT_USER_NAME");
        rabbitConfig.Password = Configuration.GetValue<string>("RABBIT_PASSWORD");
        rabbitConfig.Port = Configuration.GetValue<int>("RABBIT_PORT");

        services.AddSingleton(rabbitConfig);

        var factory = new ConnectionFactory()
        {
            HostName = rabbitConfig.Host,
            Password = rabbitConfig.Password,
            UserName = rabbitConfig.UserName,
            VirtualHost = "/",
            Port = rabbitConfig.Port,
            AutomaticRecoveryEnabled = true
        };

        var connection = factory.CreateConnection();
        services.AddSingleton<IConnection>(connection);

        string connectionString =  Configuration.GetValue<string>("CONNECTION_STRING");

        services.AddDbContext<DataContext>(options =>
        {
            options.UseMySql(connectionString);
        }); 

        services.AddHealthChecks()
            .AddMySql(connectionString, "MySQL")
            .AddRabbitMQ(name: "Rabbit");

        services.AddMassTransit( x=> {   

            x.AddConsumers(typeof(Startup).Assembly);

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => {
                Action<MassTransit.RabbitMqTransport.IRabbitMqHostConfigurator> configure = h =>
                {
                    h.Username(rabbitConfig.UserName);
                    h.Password(rabbitConfig.Password);
                };
                cfg.Host(new Uri($"rabbitmq://{rabbitConfig.Host}"),"/", configure);
                cfg.ReceiveEndpoint("identity", e=> {
                    e.UseMessageRetry(x => x.Interval(2, 100));                    
                });
            }));
        }); 

        services.AddSingleton<IHostedService, BusService>(); 

Service.yaml

kind: Service
apiVersion: v1
metadata:
  namespace:  rabbitmq
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  selector:
    app: rabbitmq
  ports:
   - name: rabbitmq-mgmt-port
     protocol: TCP
     port: 15672
     targetPort: 15672
   - name: rabbitmq-amqp-port
     protocol: TCP
     port: 5672
     targetPort: 5672

хост:

rabbitmq.rabbitmq.svc.cluster.local

Когда я развертываю свое приложение в кластере k8s, MassTransit пишет ошибку, как показано ниже.

MassTransit [0] Операция прервана: RabbitMQ.Client.Exceptions.OperationInterruptedException: операция AMQP был прерван: причина закрытия AMQP, инициированная Peer, код = 541, текст = 'INTERNAL_ERROR', classId = 0, methodId = 0 в RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply (время ожидания TimeSpan) в RabbitMQ.Client.Imp. ModelBase.QueueDeclare (Строковая очередь, Булево пассивное, Булево долговременное, Булево исключительное, Булево автоматическое удаление, IDictionary 2 arguments) at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary 2 аргумента) в MassTransit.RabbitMqTransport.Contexts.RabbitMqModelContext. <> C__DisplayClass19Tk.tas_t___ .___ .__ 1.InnerInvoke() at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread) --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter 1.ConfigureTopology (контекст ModelContext) в MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter 1.<>c__DisplayClass3_0.<<GreenPipes-IFilter<MassTransit-RabbitMqTransport-ModelContext>-Send>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.PipeExtensions.OneTimeSetup[T](PipeContext context, Func 2 se tupMethod, PayloadFactory 1 payloadFactory) at MassTransit.RabbitMqTransport.Pipeline.ConfigureTopologyFilter 1.GreenPipes.IFilter.Send (контекст ModelContext, IPipe 1 next) at MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter<MassTransit.RabbitMqTransport.ConnectionContext>.Send(ConnectionContext context, IPipe 1 следующий) в MassTransit.RabbitMqTransport.Pipeline.ReceiveEndpointFilter.GreenPipes.IFilter.Send (контекст ConnectionContext, 1.Green 10. IPipeContextSource.Send (IPipe 1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor 1.GreenPipes.IPipeContextSource.Send (IPipe 1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor 1.GreenPipes.IPipeContextSource.Send (канал IPipe`1, CancellationToken cancellationToken) в MassTransit.RabbitMqTransport.Transport.Transport.Transport. *

...