Azure ServiceBus Тема с несколькими подписчиками - повторяющиеся сообщения - PullRequest
0 голосов
/ 22 сентября 2019

У меня есть тема Azure ServiceBus под названием «IntegrationEvent» с одной подпиской под названием «TestSubscription».Я зарегистрировал двух подписчиков на эту подписку, которые у меня есть CompetingConsumers .

Многие сообщения обрабатываются в обоих подписчиках.Что мне нужно изменить, чтобы этого больше не происходило?Я думал, что каждое сообщение должно обрабатываться только одним подписчиком?

Отправитель:

class Program
{
    static async Task Main(string[] args)
    {
        await SendMessageAsync();
    }

    private static async Task SendMessageAsync()
    {
        var sendClient = new TopicClient("ConnectionString");

        var testBlock = new ActionBlock<string>(
            async id =>
            {
                string jsonMessage = JsonConvert.SerializeObject(id);
                byte[] body = Encoding.UTF8.GetBytes(jsonMessage);

                var messageToSend = new Message(body)
                {
                    CorrelationId = id,
                };

                await sendClient.SendAsync(messageToSend);

            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 25
            });

        for (int i = 0; i < 10000; i++)
        {
            testBlock.Post(Guid.NewGuid().ToString());
        }

        testBlock.Complete();
        await testBlock.Completion;
    }
}

Я использую два подписчика / получателя ( не подписка), слушающих IntegrationEvent.

class Program
{
    static SubscriptionClient subscriptionClient;

    static async Task Main(string[] args)
    {
        var builder = new ServiceBusConnectionStringBuilder("ConnectionString");
        if (string.IsNullOrWhiteSpace(builder.EntityPath))
        {
            builder.EntityPath = "IntegrationEvent";
        }

        subscriptionClient = new SubscriptionClient(builder, "TestSubscription");

        await subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);
        await subscriptionClient.AddRuleAsync(new RuleDescription(RuleDescription.DefaultRuleName, new TrueFilter()));

        ListenForMessages();

        Console.Read();
    }

    protected static void ListenForMessages()
    {
        var options = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            AutoComplete = false,
            MaxConcurrentCalls = 10
        };
        subscriptionClient.RegisterMessageHandler(ReceiveMessageAsync, options);
    }

    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs arg)
    {
        return Task.CompletedTask;
    }

    private static async Task ReceiveMessageAsync(Message arg1, CancellationToken arg2)
    {
        string integrationEvent = Encoding.UTF8.GetString(arg1.Body);
        Console.WriteLine($"{ arg1.MessageId}, { arg1.CorrelationId}, {integrationEvent}");
        await subscriptionClient.CompleteAsync(arg1.SystemProperties.LockToken);
    }
}

1 Ответ

0 голосов
/ 23 сентября 2019

Важно понять , как работают подписки, прежде чем пытаться что-либо предпринять.Об этом есть документация .В частности, правила , которые влияют на то, что будут получать подписки.

Прямо сейчас ваш код настроен на получение всех сообщений (перехватить все правила).Как только вы создадите более конкретные правила, используя фильтры SQL или корреляции, вы сможете контролировать, что получит каждая отдельная подписка.Некоторое время назад я написал пост , в котором содержится некоторая информация.

...