Как получить весь идентификатор сеанса очереди в служебной шине Azure .Net Core - PullRequest
0 голосов
/ 28 марта 2019

Я делаю веб-приложение, которое подключено к Azure с помощью .Net Core 2.1.1.Я столкнулся с некоторой проблемой, связанной с очередью служебной шины, когда я пытаюсь получить sessionid связанной очереди.

Я нашел некоторый код, но он не поддерживается .Net Core.Ниже приведен код:

var queueClient = QueueClient.CreateFromConnectionString(AppSettings.ServiceBusConnection, queueName);
var sessions = await queueClient.GetMessageSessionsAsync();
return sessions;

Я также уже пробовал эту функцию,

var connString = Configuration.GetConnectionString("servicebus");
sessionClient = new SessionClient(connString, queue,ReceiveMode.PeekLock);
List<IMessageSession> sessions=new List<IMessageSession>();
while (true)
{
    var session = await sessionClient.AcceptMessageSessionAsync();             
    if (session == null)
        break;
    sessions.Add(session);
}
return sessions;
}

Но она продолжает давать мне Timeout Exception.Кто-нибудь может мне помочь?

1 Ответ

0 голосов
/ 29 марта 2019

Это то, что я попробовал, и это сработало для меня. Пожалуйста, проверьте скриншот ниже

enter image description here

Вот код, который я пробовал

using System;

namespace Core.SBConsole
{
    using Microsoft.Azure.ServiceBus;
    using Microsoft.Azure.ServiceBus.Core;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    class Program
    {
        // Connection String for the namespace can be obtained from the Azure portal under the 
        // 'Shared Access policies' section.
        const string ServiceBusConnectionString = "{Connection String}";
        const string QueueName = "mvq";
        static IMessageSender messageSender;
        static ISessionClient sessionClient;
        const string SessionPrefix = "session-prefix";

        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        static async Task MainAsync()
        {
            const int numberOfSessions = 5;
            const int numberOfMessagesPerSession = 3;

            messageSender = new MessageSender(ServiceBusConnectionString, QueueName);
            sessionClient = new SessionClient(ServiceBusConnectionString, QueueName);

            // Send messages with sessionId set
            await SendSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);

            // Receive all Session based messages using SessionClient
            await ReceiveSessionMessagesAsync(numberOfSessions, numberOfMessagesPerSession);

            Console.WriteLine("=========================================================");
            Console.WriteLine("Completed Receiving all messages... Press any key to exit");
            Console.WriteLine("=========================================================");

            Console.ReadKey();

            await messageSender.CloseAsync();
            await sessionClient.CloseAsync();
        }

        static async Task ReceiveSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
        {
            Console.WriteLine("===================================================================");
            Console.WriteLine("Accepting sessions in the reverse order of sends for demo purposes");
            Console.WriteLine("===================================================================");

            for (int i = 0; i < numberOfSessions; i++)
            {
                int messagesReceivedPerSession = 0;

                // AcceptMessageSessionAsync(i.ToString()) as below with session id as parameter will try to get a session with that sessionId.
                // AcceptMessageSessionAsync() without any messages will try to get any available session with messages associated with that session.
                IMessageSession session = await sessionClient.AcceptMessageSessionAsync();// (SessionPrefix + i.ToString());

                if (session != null)
                {
                    // Messages within a session will always arrive in order.
                    Console.WriteLine("=====================================");
                    Console.WriteLine($"Received Session: {session.SessionId}");

                    while (messagesReceivedPerSession++ < messagesPerSession)
                    {
                        Message message = await session.ReceiveAsync();

                        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
                        await session.CompleteAsync(message.SystemProperties.LockToken);
                    }

                    Console.WriteLine($"Received all messages for Session: {session.SessionId}");
                    Console.WriteLine("=====================================");

                    // Close the Session after receiving all messages from the session
                    await session.CloseAsync();
                }
            }
        }

        static async Task SendSessionMessagesAsync(int numberOfSessions, int messagesPerSession)
        {
            if (numberOfSessions == 0 || messagesPerSession == 0)
            {
                await Task.FromResult(false);
            }

            for (int i = numberOfSessions - 1; i >= 0; i--)
            {
                var messagesToSend = new List<Message>();
                string sessionId = SessionPrefix + i;
                for (int j = 0; j < messagesPerSession; j++)
                {
                    // Create a new message to send to the queue
                    string messageBody = "test" + j;
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));
                    // Assign a SessionId for the message
                    message.SessionId = sessionId;
                    messagesToSend.Add(message);

                    // Write the sessionId, body of the message to the console
                    Console.WriteLine($"Sending SessionId: {message.SessionId}, message: {messageBody}");
                }

                // Send a batch of messages corresponding to this sessionId to the queue
                await messageSender.SendAsync(messagesToSend);
            }

            Console.WriteLine("=====================================");
            Console.WriteLine($"Sent {messagesPerSession} messages each for {numberOfSessions} sessions.");
            Console.WriteLine("=====================================");
        }
    }
}

Что нужно учесть перед созданием очереди

1) Убедитесь, что служебная шина не находится на свободном или базовом уровне, если да, то масштабируйте ее до Standadrd

2) СделайтеОбязательно включите сеанс во время создания очереди.

Я использую Microsoft.Azure.ServiceBus nuget pakcage 3.4, который является самым последним. Если вы используете какой-либо другой пакет, попробуйте обновить или понизить его.

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...