Похоже, вы неправильно поняли концепцию конечных точек, потребителей и соглашений.
Я очень обеспокоен тем, что вы используете LoadFrom
для каждой конечной точки. Это означает, что все ваших потребителей, зарегистрированных в контейнере, будут прослушивать в каждой конечной точке.
Обычно, если вы хотите разделить потребителей по конечной точке, вам необходимо явно настроить конечную точку, вызвав ep.Consumer<MyConsumer>(provider)
.
cfg.ReceiveEndpoint(host, "some_endp_2", e =>
{
e.Consumer<SomeConsumer>(provider);
});
cfg.ReceiveEndpoint(host, "some_endp_2", e =>
{
e.Consumer<SomeOtherConsumer>(provider);
});
Когда вы используете LoadFrom
, каждая из ваших конечных точек будет иметь всех потребителей из контейнера. Если вы используете LoadFrom
для каждой конечной точки, каждая конечная точка будет подписана на все ваши сообщения, и вы получите каждое сообщение столько раз, сколько таких конечных точек. Определенно, это не то, что вы хотите.
Вы, вероятно, неправильно поняли значение EndpointConventions
. Соглашения используются только для отправки сообщений, а не для получения сообщений. Конечные точки будут получать все сообщения, которые он может потреблять.
Решение 1
Если вы не ожидаете большого трафика, вы можете поместить всех потребителей в одну конечную точку, а затем использовать LoadFrom
.
cfg.ReceiveEndpoint(host, "my_service", e =>
{
e.LoadFrom(provider);
});
Решение 2:
Если вы хотите разделить потребителей по одному потребителю для каждой конечной точки и использовать что-то похожее на конфигурацию конечной точки в одну линию, это легко сделать с помощью следующего кода:
using System;
using MassTransit.RabbitMqTransport;
namespace MassTransit.TestCode
{
public static class BusConfigurationExtensions
{
public static void ConfigureEndpoint<T>(this IRabbitMqBusFactoryConfigurator cfg,
IRabbitMqHost host, string endpointName, IServiceProvider provider)
where T : class, IConsumer
=> cfg.ReceiveEndpoint(host, endpointName, ep => ep.Consumer<T>(provider));
}
}
Затем вы можете использовать его как:
cfg.ConfigureEndpoint<SubmitOrderConsumer>(host, "submit_order", provider);
cfg.ConfigureEndpoint<MarkOrderAsPaidConsumer>(host, "mark_paid", provider);
cfg.ConfigureEndpoint<ShipOrderConsumer>(host, "ship_order", provider);
При использовании этого расширения вы не должны использовать AddMassTransit
метод MassTransit.Extensions.DependencyInjection
, но вам необходимо зарегистрировать все ваши потребительские зависимости в наборе сервисов.
Однако я не вижу смысла в наличии такого расширения, если вы все еще можете использовать этот однострочный код в коде конфигурации шины.
cfg.ReceiveEndpoint(host, endpointName, ep => ep.Consumer<SubmitOrderConsumer>(provider));
cfg.ReceiveEndpoint(host, endpointName, ep => ep.Consumer<MarkOrderAsPaidConsumer>(provider));
cfg.ReceiveEndpoint(host, endpointName, ep => ep.Consumer<ShipOrderConsumer>(provider));
Тот факт, что в моем первом фрагменте я использовал группы методов, не означает, что вы не можете просто использовать выражение лямбда.
В следующем выпуске появится новый пакет MassTransit.AspNetCore
для лучшей интеграции MassTransit с ASP.NET Core. Он настроит хостинг шины, правильно зарегистрирует экземпляр шины, а также применит логирование. Тогда конфигурация будет выглядеть так (этот код работает, я только что написал и протестировал его):
public void ConfigureServices(IServiceCollection services)
{
services.AddMassTransit(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "message_one", ep => ep.Consumer<MessageOneConsumer>(provider));
cfg.ReceiveEndpoint(host, "message_two", ep => ep.Consumer<MessageTwoConsumer>(provider));
}));
}
Помните, что каждая конечная точка будет иметь свою очередь. Если вы только начинаете свою работу и хотите попробовать свои силы, вы можете выбрать первое решение, чтобы у вас была одна очередь. Со временем вы можете разделить свои конечные точки.