Поставьте в очередь асинхронные операции, чтобы они выполнялись по одному в приложении MVVM - PullRequest
0 голосов
/ 21 января 2019

Я сейчас создаю приложение MVVM, одна из моих моделей представлений использует Сервис, зарегистрированный с внедрением зависимостей. Эта служба запускает команды Powerhell командлета или http REST для различных сторонних приложений, которые не очень рады, когда получают несколько запросов одновременно.

Вот почему я хотел бы иметь возможность запускать несколько операций из пользовательского интерфейса (не блокируя его), но следя за тем, чтобы служба обрабатывала только по одной за раз. Мои элементы пользовательского интерфейса покажут, если они работают или ожидают.

Я пытался реализовать TPL ActionBlock, но до сих пор все мои операции выполнялись одновременно, и единственный способ заставить их работать в очереди блокирует пользовательский интерфейс до тех пор, пока все задачи не будут выполнены.

Вот что я сделал:

Моя модель представления содержит коллекцию ObservableCollection элементов, которая содержит два списка (один вложен в другой). В пользовательском интерфейсе он выглядит как список элементов, который можно развернуть, чтобы показать небольшое древовидное представление.

Я хочу, чтобы каждый раз, когда я раскрываю элемент, все вложенные элементы в древовидном представлении проверяли свой статус в стороннем приложении через службу. Метод в подпункте пользовательского интерфейса выглядит следующим образом:

private async Task<bool> UpdateSubItemsStatus()
    {
        foreach (var item in connectorsMenuItems)
        {
            await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
        }
        return true;
    }

Здесь «parent» - это элемент 1-го уровня, а «parent.Library» - модель основного вида, в которой размещается все.

На модели View методы, которые получают это, следующие:

public async Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
    {
        logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        //Set requestor UI item in working state in the UI
        if(ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
        }

        ActionBlock<OperationType> actionBlock = new ActionBlock<OperationType>(async _operationType =>
        {
            logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
            await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
        });

        actionBlock.Post(operationType);
        actionBlock.Complete();
        actionBlock.Completion.Wait();
    }

Тогда служба, названная здесь «connectorService», выполняет свою работу.

Здесь, в последней строке, если я использую actionBlock.Completion.Wait (), все задачи выполняются последовательно, мой пользовательский интерфейс заблокирован.

Если я использую вместо этого, ожидаем actionBlock.Completion (). Пользовательский интерфейс не заблокирован, но все работает параллельно.

Так что, если бы у кого-то был совет, было бы здорово!

ОБНОВЛЕНИЕ:

Я адаптировал ответчик JSteward для своих нужд:

Я объявил ActionBlock, как вы рекомендовали, в качестве частного члена моей модели представления. Но когда я сделал, как вы сказали, когда я израсходовал элемент, он функционировал там, где очередь стояла правильно, но если я развернул другой элемент, то его операции (которые также были в их очереди) выполнялись параллельно операциям первого элемента. Что не является поведением, я ожидаю только одну операцию за раз, независимо от того, сколько элементов запрашивают.

Итак, я сделал следующие изменения: ActionBlock инициализируется в конструкторе модели представления раз и навсегда:

public ViewModelCtor()
{
 actionBlock = new ActionBlock<ConnectorOperationArgWrapper>(async _connectorOperationArgWrapper =>
            {
                logManager.WriteLog($"Library : Sending the operation to connector service for {_connectorOperationArgWrapper.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_connectorOperationArgWrapper.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).StatusString = "Cheking Presence";
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).Status = LibraryItemState.UpdatingStatus;
                await connectorService.EnqueueConnectorOperations(_connectorOperationArgWrapper.operationType, _connectorOperationArgWrapper.ci, _connectorOperationArgWrapper.itemPresence.itemId, Settings.Default.ProjectLocalPath + @"\" + _connectorOperationArgWrapper.ci.ID.ToString(), _connectorOperationArgWrapper.itemPresence.ConnectorId, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, _connectorOperationArgWrapper.itemPresence).ConfigureAwait(false);
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
            });
}

Итак, вызов методов по элементам, который теперь расширяется, выглядит следующим образом:

public async Task EnqueueConnectorOperations(ConnectorOperationArgWrapper _args)
    {

        logManager.WriteLog($"Library : Received operation request for {_args.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_args.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);

        if (_args.ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).Status = LibraryItemState.NeedsAttention;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).StatusString = "Waiting";
        }

        logManager.WriteLog($"Library : post actionblock", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
        await actionBlock.SendAsync(_args);

        //actionBlock.Complete();
        //await actionBlock.Completion;
    }

Я прокомментировал часть с завершением и завершением actionBlock, потому что я хочу, чтобы блок мог получать и ставить в очередь запрос в любое время и даже, может быть, несколько раз за элемент.

Пока что это похоже на работу, правильно ли это делать, как я, или я столкнусь с некоторыми проблемами с этим?

Ответы [ 2 ]

0 голосов
/ 22 января 2019

Использование BlockingCollection с TaskCompletionSource

    using System;
    using System.Collections;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading.Tasks;

    namespace test
    {
        class Program
        {
            class WorkItem
            {
                public int Id { get; set; } // you can make it store more things
                public TaskCompletionSource<DateTime> TaskSource { get; } = new TaskCompletionSource<DateTime>();
            }

            class Worker : IDisposable
            {
                private BlockingCollection<WorkItem> _queue;
                private Task _consumer;

                public Worker()
                {
                    _queue = new BlockingCollection<WorkItem>();

                    _consumer = Task.Run(async () =>
                    {
                        foreach (var item in _queue.GetConsumingEnumerable())
                        {
                            await Task.Delay(1000); // some hard work
                            item.TaskSource.TrySetResult(DateTime.Now); // try is safer
// you can return whatever you want
                        }
                    });
                }

                public Task<DateTime> DoWork(int i) // return whatever you want
                {
                    var workItem = new WorkItem { Id = i };
                    _queue.Add(workItem);

                    return workItem.TaskSource.Task;
                }
                public void Dispose()
                {
                    _queue.CompleteAdding();
                }
            }

            public static void Main(string[] args)
            {
                using (var worker = new Worker())
                {
                    Task.Run(async () =>
                    {
                        var tasks = Enumerable.Range(0,10).Select(x => worker.DoWork(x)).ToArray();

                        var time = await tasks[1];
                        Console.WriteLine("2nd task finished at " + time);

                        foreach (var task in tasks)
                        {
                            time = await task;
                            Console.WriteLine("Task finished at " + time);
                        }

                        Console.ReadLine();
                    }).Wait();
                }



            }
        }
    }
    // output
    2nd task finished at 2019-01-22 19:14:57
    Task finished at 2019-01-22 19:14:56
    Task finished at 2019-01-22 19:14:57
    Task finished at 2019-01-22 19:14:58
    Task finished at 2019-01-22 19:14:59
    Task finished at 2019-01-22 19:15:00
    Task finished at 2019-01-22 19:15:01
    Task finished at 2019-01-22 19:15:02
    Task finished at 2019-01-22 19:15:03
    Task finished at 2019-01-22 19:15:04
    Task finished at 2019-01-22 19:15:05

Это позволяет легко ожидать один элемент внутри команды, например, без блокировки потока пользовательского интерфейса.

0 голосов
/ 21 января 2019

Прямо сейчас вы создаете новый ActionBlock для каждой операции.У ActionBlock есть внутренняя очередь, в которую вы должны отправлять сообщения, и позволить ей запускать их последовательно, используя один ActionBlock.Переставив вещи и сделав ActionBlock членом класса, вы сможете намного лучше управлять им и ожидать каждой группы элементов подпредставления.

private ActionBlock<OperationType> actionBlock;

public void OnTreeViewExpand()
{
    //Re-initialize the actionblock for a new set of operations
    actionBlock = new ActionBlock<OperationType>(async _operationType =>
    {
        logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
    });
}

private async Task<bool> UpdateSubItemsStatus()
{
    foreach (var item in connectorsMenuItems)
    {
        await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
    }

    //All items sent, signal completion
    actionBlock.Complete();
    await actionBlock.Completion;
    return true;
}

public Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
{
    logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
    //Set requestor UI item in working state in the UI
    if (ci.CIType == EnumCIType.Application)
    {
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
        LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
    }
    return actionBlock.SendAsync(operationType);
}
...