MassTransit: очередь не создана для обмена темами - PullRequest
0 голосов
/ 17 мая 2019

Я хочу отправить данные о ценах на акции на rabbitmq, используя topic exchange.Идея в том, что у меня есть обмен темами со следующим ключом маршрутизации:

<message-type>.<ticker>

Я смог сделать это с помощью собственного RabbitMQ cient, но я не могу понять, как это сделать с помощью MassTransit.

// setup topologies
rabbitCfg.Send<ComMessage>(x =>
{
   x.UseRoutingKeyFormatter(context => 
        $"quote.{context.Message.Ticker}");
});

rabbitCfg.Message<ComMessage>(x => x.SetEntityName("Quotes"));
rabbitCfg.Publish<ComMessage>(x =>
{
   x.ExchangeType = ExchangeType.Topic;
});


// setup reciever
rabbitCfg.ReceiveEndpoint(host, "MSFT", e =>
{
   e.Bind("Quotes", c =>
      {
         c.RoutingKey = "quote.MSFT";
         c.ExchangeType = ExchangeType.Topic;
      });

   e.Consumer<PriceConsumer>();
});

Отправить сообщение:

await _bus.Publish(new ComMessage
{
   Ticker = "MSFT",
   Price = "10"
});

Но это не работает.Очередь не создана, но обмен получает сообщения:

enter image description here

Где проблема?

1 Ответ

1 голос
/ 17 мая 2019

Я думаю, что вы забыли одну важную строчку. И для справки, я включил источник работающего юнит-теста с использованием тематических обменов.

В конечной точке получения вам необходимо отключить автоматическое связывание обмена.

cfg.ReceiveEndpoint(host, "MSFT", x =>
{
    x.BindMessageExchanges = false;
    ...
}

Рабочий пример показан ниже:

using System;
using System.Threading.Tasks;
using GreenPipes.Util;
using NUnit.Framework;
using RabbitMQ.Client;
using RoutingKeyTopic;


namespace RoutingKeyTopic
{
    public class Message
    {
        public Message(decimal price, string symbol)
        {
            Price = price;
            Symbol = symbol;
        }

        public string Symbol { get; set; }

        public decimal Price { get; set; }
    }
}


[TestFixture]
public class Using_a_routing_key_and_topic_exchange :
    RabbitMqTestFixture
{
    [Test]
    public async Task Should_support_routing_by_key_and_exchange_name()
    {
        var fooHandle = await Subscribe("MSFT");
        try
        {
            var barHandle = await Subscribe("UBER");
            try
            {
                await Bus.Publish(new Message(100.0m, "MSFT"));
                await Bus.Publish(new Message(3.50m, "UBER"));

                await Consumer.Microsoft;
                await Consumer.Uber;
            }
            finally
            {
                await barHandle.StopAsync(TestCancellationToken);
            }
        }
        finally
        {
            await fooHandle.StopAsync(TestCancellationToken);
        }
    }

    async Task<HostReceiveEndpointHandle> Subscribe(string key)
    {
        var queueName = $"Stock-{key}";
        var handle = Host.ConnectReceiveEndpoint(queueName, x =>
        {
            x.BindMessageExchanges = false;
            x.Consumer<Consumer>();

            x.Bind<Message>(e =>
            {
                e.RoutingKey = GetRoutingKey(key);
                e.ExchangeType = ExchangeType.Topic;
            });
        });

        await handle.Ready;

        return handle;
    }

    protected override void ConfigureRabbitMqBusHost(IRabbitMqBusFactoryConfigurator configurator, IRabbitMqHost host)
    {
        base.ConfigureRabbitMqBusHost(configurator, host);

        configurator.Message<Message>(x => x.SetEntityName(ExchangeName));
        configurator.Publish<Message>(x => x.ExchangeType = ExchangeType.Topic);

        configurator.Send<Message>(x => x.UseRoutingKeyFormatter(context => GetRoutingKey(context.Message.Symbol)));
    }

    string ExchangeName { get; } = "Quotes";

    string GetRoutingKey(string routingKey)
    {
        return $"quote.{routingKey}";
    }


    class Consumer :
        IConsumer<Message>
    {
        static readonly TaskCompletionSource<Message> _microsoft = new TaskCompletionSource<Message>();
        static readonly TaskCompletionSource<Message> _uber = new TaskCompletionSource<Message>();
        public static Task<Message> Microsoft => _microsoft.Task;
        public static Task<Message> Uber => _uber.Task;

        public Task Consume(ConsumeContext<Message> context)
        {
            Console.WriteLine($"Received {context.Message.Symbol} for {context.RoutingKey()}");

            if (context.Message.Symbol == "MSFT")
                _microsoft.TrySetResult(context.Message);

            if (context.Message.Symbol == "UBER")
                _uber.TrySetResult(context.Message);

            return TaskUtil.Completed;
        }
    }
}
...