Разделение сообщений по ключу маршрутизации в общественном транспорте - PullRequest
0 голосов
/ 26 марта 2020

Я хочу отправить сообщение в соответствующую очередь на основе ключа маршрутизации. Начиная с приложения производителя, мой код (соответствующие части):

var options = serviceProvider
    .GetService<IConfiguration>()
    .GetOptions<RabbitMqProducerOptions>("RabbitMqProducer");

foreach (var option in options?.Endpoints)
{
    var method = typeof(EndpointConvention).GetMethod("Map", new[] { typeof(Uri) });
    var type = Assembly.Load(option.Assembly).GetTypes().First(t => t.Name == option.Type);
    var genericMethod = method.MakeGenericMethod(new[] { type });

    genericMethod.Invoke(null, new[] { new Uri($"{options.Address}/{option.Name}") });
}

Bus.Factory.CreateUsingRabbitMq(cfg => {
    cfg.Host(options.Address);
    cfg.Send<CreateProducts>(x => x.UseRoutingKeyFormatter(context => context.Message.Platform));
});

С вышесказанным все в порядке - он создает обмены, как я их объявил в файле конфигурации (обмены разветвления, если это имеет значение). Теперь конфигурация потребителя:

var options = serviceProvider.GetService<IConfiguration>().GetOptions<RabbitMqConsumerOptions>("RabbitMqConsumer");

Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(options.Address);

    foreach (var kvp in options.Endpoints)
    {
        cfg.ReceiveEndpoint("ingest-products", ep =>
        {
            ep.PrefetchCount = kvp.Value.PrefetchCount;
            ep.BindMessageExchanges = false;

            ep.UseMessageRetry(r => r.Interval(kvp.Value.RetryCount, kvp.Value.RetryInterval));
            ep.Bind("ingest-amazon-products", x => BindForEndpoint(x, kvp.Value.RoutingKey));
            BindExchange(ep, kvp.Value.RoutingKey, kvp.Value.Assembly, kvp.Value.Type);
            ep.ConfigureConsumers(serviceProvider);
        });
    }
});

Теперь приведенный выше код работает, но не так, как я предполагал, поскольку сообщения без соответствующих ключей маршрутизации все еще доставляются моим потребителям. Я имею в виду - если значение конфигурации kvp.Value.RoutingKey равно X, и производитель создает сообщение с ключом маршрутизации Y, потребитель, прослушивающий X, получит сообщение Y. Как это исправить?

1 Ответ

0 голосов
/ 03 апреля 2020

После публикации награды я обнаружил, что должен использовать метод Publish интерфейса BusFactoryConfigurator.

...