Как создать внедрение зависимостей для очередей хранилища Azure для обработки каждого сообщения, как оно предоставляется - PullRequest
0 голосов
/ 28 ноября 2018

Попытка чтения из очереди Azure, в которую пишет какая-либо другая служба.Если я использую это в startup.cs

CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=*;AccountKey=*;EndpointSuffix=*");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("*");
queue.CreateIfNotExists();
var message= queue.GetMessage();

, я могу получить сообщение в переменной 'message', но как вставить это при запуске, чтобы мой класс процессора вызывался с сообщением каждый раз, когда происходитновое сообщение в очереди.Я попытался добавить синглтон,

services.AddSinleton<ProcessorClassInterface>(x=> {return new ProcessorClass(queue)});

И затем вызывать queue.GetMessage через каждую 1 секунду.

1 Ответ

0 голосов
/ 05 марта 2019

Эта проблема была решена путем вызова функции, которая использует многопоточность для опроса очередей Azure по истечении заданного интервала времени и извлечения сообщений (возможно, с установленным экспоненциальным временем задержки).

Подход 1: Реализовать это в веб-приложении немного сложнее, и мне пришлось использовать хак - вызвать функцию из конструктора, чтобы начать опрос.

В файле startup.cs (внутри функции конфигурации) зарегистрируйте службу,

app.ApplicationServices.GetService<IQueueConsumer>();

В функции ConfigureServices, Конфигурирование и создание объекта класса очереди опроса,

services.TryAddTransient<IQueueConsumer>(sp => this.GetQueueProcessor(sp));

А затем, когда конструктор вызывается для создания объекта, начинаем опрашивать очередь в другом потоке.

public QueuePollingFunction(
        IOptions<QueueOptions> queueOptions,
        CloudQueue queue)
    {
        this.isEnabled = queueOptions.Value.IsEnabled;
        this.StartPollingQueue(queue);
    }

       public override async Task<bool> ProcessMessageAsync(string message)
    {
        bool result = false;
        try
        {
            var messageContent = JsonConvert.DeserializeObject<QueueEntity>(message);
            result = true;
        }
        catch (Exception e)
        {
            Trace.TraceError(e.ToString());
        }

        return result;
    }

    private async Task StartPollingQueue(CloudQueue queue)
    {
        if (this.isEnabled)
        {
            Task pollQueue = Task.Factory.StartNew(() => Parallel.For(0, this.numberOfParallelTasks, work =>
            {
                this.Start(queue);
            }));
        }
    }

    private async Task Start(CloudQueue queue)
    {
        while (true)
        {
            try
            {
                CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
                if (retrievedMessage != null)
                {
                    // Fail Logic
                    if (retrievedMessage.DequeueCount > this.maxRetryLimit)
                    {
                        await queue.DeleteMessageAsync(retrievedMessage);
                    }
                    bool isPass = await this.ProcessMessageAsync(newChannelSettings);
                    if (isPass)
                    {
                        await queue.DeleteMessageAsync(retrievedMessage);
                    }
                }
                else
                {
                    // If queue is empty, then the Task can sleep for sleepTime duration
                    await Task.Delay(this.sleepTime);
                }
            }
            catch (Exception e)
            {
                Trace.TraceError(e.ToString());
            }
        }
    }

Подход 2: Однако позже пришлось перейти на оптимальный подход, который должен использовать рабочие роли и затем использовать Задачи для запуска фонового потока для выполнения этой задачи.

...