MassTransit: потребляет сообщение из очереди Rabbit - PullRequest
0 голосов
/ 24 марта 2020

У меня есть служба (написана в Python), которая инициализирует обмены и очереди Rabbit.

Я пытаюсь создать другую службу, в. Net Core, которая должна принимать сообщение от указать c очередь, которая уже определена с использованием MassTransit.

До сих пор я не смог прослушать эту очередь ..

все, что я пытаюсь сделать (добавить ' ReceiveEndpoint »к конфигурации шины или добавление потребителя к шине с помощью« ConnectReceiveEndpoint »/« ConnectConsumer »или даже« Bind »моей конечной точки) - вызывает создание новой очереди и создание обмена вместо прослушивания того, что уже существует.

Как мне настроить мой автобус ??

Все мои попытки отмечены в комментариях:

private void ConfigureRabbitMqMassTransitServices(IServiceCollection services)
{
    var myServiceSettings = ..
    var hostSettings = ..

        IBusControl CreateBus(IServiceProvider serviceProvider)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.Send<xxx>(x 
                     =>
                {
                    x.UseRoutingKeyFormatter(context => myServiceSettings.RoutingKey);
                });

                cfg.Message<xxx>(x
                    =>
                {
                    x.SetEntityName(myServiceSettings.RequestExchangeName);
                });

                cfg.Publish<xxx>(x
                    =>
                {
                    x.ExchangeType = ExchangeType.Direct;
                    x.Durable = true;
                });

                var host = cfg.Host(
                    new Uri(hostSettings.Address), host =>
                    {
                        host.Username(hostSettings.Username);
                        host.Password(hostSettings.Password);
                    }
                );

               // cfg.ReceiveEndpoint(host, theRelevantQueueName, q =>
               // {             
          ////            q.Bind(theRelevantExchangeName);        
               //     q.Consumer<RelevantConsumer>(serviceProvider);
               // });    
            });

            //bus.ConnectReceiveEndpoint(
            //    theRelevantQueueName,   e =>
            //    e.Consumer<RelevantConsumer>(serviceProvider));

            //bus.ConnectConsumer(() => new RelevantConsumer());

            return bus;
        }

        void ConfigureMassTransit(IServiceCollectionConfigurator configurator)
        {
            configurator.AddBus(CreateBus);


            //configurator.AddConsumer<RelevantConsumer>();

            //EndpointConvention.Map<RelevantConsumerMessage>(new Uri(
            //    $"{hostSettings.Address}/{theRelevantQueueName}"));

            //services.AddSingleton<IRelevantConsumer, RelevantConsumer>();



            EndpointConvention.Map<xxx>(new Uri(
                $"{hostSettings.Address}/{xExchangeName}"));
            services.AddSingleton<IXProducer, XProducer>();

        }

        services.AddMassTransit(ConfigureMassTransit);            
        services.AddSingleton<IHostedService, TheBusService>();
    }
...