У меня есть тема Azure ServiceBus под названием «IntegrationEvent» с одной подпиской под названием «TestSubscription».Я зарегистрировал двух подписчиков на эту подписку, которые у меня есть CompetingConsumers .
Многие сообщения обрабатываются в обоих подписчиках.Что мне нужно изменить, чтобы этого больше не происходило?Я думал, что каждое сообщение должно обрабатываться только одним подписчиком?
Отправитель:
class Program
{
static async Task Main(string[] args)
{
await SendMessageAsync();
}
private static async Task SendMessageAsync()
{
var sendClient = new TopicClient("ConnectionString");
var testBlock = new ActionBlock<string>(
async id =>
{
string jsonMessage = JsonConvert.SerializeObject(id);
byte[] body = Encoding.UTF8.GetBytes(jsonMessage);
var messageToSend = new Message(body)
{
CorrelationId = id,
};
await sendClient.SendAsync(messageToSend);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 25
});
for (int i = 0; i < 10000; i++)
{
testBlock.Post(Guid.NewGuid().ToString());
}
testBlock.Complete();
await testBlock.Completion;
}
}
Я использую два подписчика / получателя ( не подписка), слушающих IntegrationEvent.
class Program
{
static SubscriptionClient subscriptionClient;
static async Task Main(string[] args)
{
var builder = new ServiceBusConnectionStringBuilder("ConnectionString");
if (string.IsNullOrWhiteSpace(builder.EntityPath))
{
builder.EntityPath = "IntegrationEvent";
}
subscriptionClient = new SubscriptionClient(builder, "TestSubscription");
await subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
await subscriptionClient.AddRuleAsync(new RuleDescription(RuleDescription.DefaultRuleName, new TrueFilter()));
ListenForMessages();
Console.Read();
}
protected static void ListenForMessages()
{
var options = new MessageHandlerOptions(ExceptionReceivedHandler)
{
AutoComplete = false,
MaxConcurrentCalls = 10
};
subscriptionClient.RegisterMessageHandler(ReceiveMessageAsync, options);
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs arg)
{
return Task.CompletedTask;
}
private static async Task ReceiveMessageAsync(Message arg1, CancellationToken arg2)
{
string integrationEvent = Encoding.UTF8.GetString(arg1.Body);
Console.WriteLine($"{ arg1.MessageId}, { arg1.CorrelationId}, {integrationEvent}");
await subscriptionClient.CompleteAsync(arg1.SystemProperties.LockToken);
}
}