Есть ли в .NET 4 коллекция Threadsafe Observable? - PullRequest
21 голосов
/ 10 октября 2010

Платформа: WPF, .NET 4.0, C# 4.0

Проблема: в Mainwindow.xaml у меня есть ListBox, связанный с коллекцией Customer, которая в настоящее время является ObservableCollection .

ObservableCollection<Customer> c = new ObservableCollection<Customer>();

Эта коллекция может обновляться через несколько источников, таких как FileSystem, WebService и т. Д.

Чтобы разрешить параллельную загрузку клиентов, я создал вспомогательный класс

public class CustomerManager(ref ObsevableCollection<Customer> cust)

, который внутренне порождает новую задачу (из библиотеки расширений Parallel) для каждого источника клиента и добавляет новый экземпляр Customer в объект коллекции клиентов (передается с помощью ref в его ctor).

Проблема в том, что ObservableCollection (или любая коллекция в этом отношении) не может использоваться из вызовов, отличных от потока пользовательского интерфейса, и возникает исключение:

"NotSupportedException - этот тип CollectionView не поддерживает изменения в его SourceCollection из потока, отличного от потока Dispatcher."

Я пытался использовать

System.Collections.Concurrent.ConcurrentBag<Customer>

коллекция, но она не реализует интерфейс INotifyCollectionChanged. Следовательно, мой интерфейс WPF не будет обновляться автоматически.

Итак, есть ли класс коллекции, который реализует как уведомления об изменениях свойств / коллекций, так и разрешает вызовы из других потоков, не относящихся к пользовательскому интерфейсу? По моему первоначальному бингу / поиску в Google ничего не предусмотрено из коробки.

Редактировать: я создал свою собственную коллекцию, которая наследуется от ConcurrentBag , а также реализует интерфейс INotifyCollectionChanged . Но, к моему удивлению, даже после вызова его в отдельных задачах пользовательский интерфейс WPF зависает до тех пор, пока задача не будет завершена. Разве задачи не должны выполняться параллельно и не блокировать поток пользовательского интерфейса ?

Заранее спасибо за любые предложения.

Ответы [ 4 ]

6 голосов
/ 26 июня 2013

Есть два возможных подхода.Первый будет наследовать от параллельной коллекции и добавить функциональность INotifyCollectionChanged, а второй будет наследовать от коллекции, которая реализует INotifyCollectionChanged, и добавить поддержку параллелизма.Я думаю, что гораздо проще и безопаснее добавить поддержку INotifyCollectionChanged в параллельную коллекцию.Мое предложение ниже.

Это выглядит долго, но большинство методов просто вызывают внутреннюю параллельную коллекцию, как если бы вызывающая сторона использовала ее напрямую.Горстка методов, которые добавляют или удаляют из коллекции, внедряют вызов частного метода, который вызывает событие уведомления у диспетчера, предоставляемого при построении, что позволяет классу быть потокобезопасным, но гарантирует, что уведомления генерируются в одном потоке всевремя.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Windows.Threading;

namespace Collections
{
    /// <summary>
    /// Concurrent collection that emits change notifications on a dispatcher thread
    /// </summary>
    /// <typeparam name="T">The type of objects in the collection</typeparam>
    [Serializable]
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    public class ObservableConcurrentBag<T> : IProducerConsumerCollection<T>,
        IEnumerable<T>, ICollection, IEnumerable
    {
        /// <summary>
        /// The dispatcher on which event notifications will be raised
        /// </summary>
        private readonly Dispatcher dispatcher;

        /// <summary>
        /// The internal concurrent bag used for the 'heavy lifting' of the collection implementation
        /// </summary>
        private readonly ConcurrentBag<T> internalBag;

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that will raise <see cref="INotifyCollectionChanged"/> events
        /// on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>();
        }

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that contains elements copied from the specified collection 
        /// that will raise <see cref="INotifyCollectionChanged"/> events on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher, IEnumerable<T> collection)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>(collection);
        }

        /// <summary>
        /// Occurs when the collection changes
        /// </summary>
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event on the <see cref="dispatcher"/>
        /// </summary>
        private void RaiseCollectionChangedEventOnDispatcher(NotifyCollectionChangedEventArgs e)
        {
            this.dispatcher.BeginInvoke(new Action<NotifyCollectionChangedEventArgs>(this.RaiseCollectionChangedEvent), e);
        }

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event
        /// </summary>
        /// <remarks>
        /// This method must only be raised on the dispatcher - use <see cref="RaiseCollectionChangedEventOnDispatcher" />
        /// to do this.
        /// </remarks>
        private void RaiseCollectionChangedEvent(NotifyCollectionChangedEventArgs e)
        {
            this.CollectionChanged(this, e);
        }

        #region Members that pass through to the internal concurrent bag but also raise change notifications

        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            bool result = ((IProducerConsumerCollection<T>)this.internalBag).TryAdd(item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            return result;
        }

        public void Add(T item)
        {
            this.internalBag.Add(item);
            this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryTake(out T item)
        {
            bool result = this.TryTake(out item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item));
            }
            return result;
        }

        #endregion

        #region Members that pass through directly to the internal concurrent bag

        public int Count
        {
            get
            {
                return this.internalBag.Count;
            }
        }

        public bool IsEmpty
        {
            get
            {
                return this.internalBag.IsEmpty;
            }
        }

        bool ICollection.IsSynchronized
        {
            get
            {
                return ((ICollection)this.internalBag).IsSynchronized;
            }
        }

        object ICollection.SyncRoot
        {
            get
            {
                return ((ICollection)this.internalBag).SyncRoot;
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            return ((IEnumerable<T>)this.internalBag).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable)this.internalBag).GetEnumerator();
        }

        public T[] ToArray()
        {
            return this.internalBag.ToArray();
        }

        void IProducerConsumerCollection<T>.CopyTo(T[] array, int index)
        {
            ((IProducerConsumerCollection<T>)this.internalBag).CopyTo(array, index);
        }

        void ICollection.CopyTo(Array array, int index)
        {
            ((ICollection)this.internalBag).CopyTo(array, index);
        }

        #endregion
    }
}
3 голосов
/ 08 мая 2016

Я целую вечность смотрел на все решения, и ни одно из них не соответствовало тому, что мне было нужно, пока я наконец не понял проблему: я не хотел список с безопасным потоком - я просто хотел список без потоков, который можно изменить в любом потоке, но это уведомило об изменениях в потоке пользовательского интерфейса.

(Причина, по которой нежелание использовать потокобезопасную коллекцию - обычная причина - часто вам нужно выполнить несколько операций, например, «если этого нет в списке, затем добавить его»«с какими списками безопасности потоков на самом деле не помогает, так что вы хотите сами управлять блокировкой).

Решение оказалось довольно простым по своей концепции и хорошо сработало для меня.Просто создайте новый класс списка, который реализует IList<T> и INotifyCollectionChanged.Делегируйте все вызовы, которые вам нужны, в базовую реализацию (например, List<T>), а затем при необходимости вызывайте уведомления в потоке пользовательского интерфейса.

public class AlbumList : IList<Album>, INotifyCollectionChanged
{
    private readonly IList<Album> _listImplementation = new List<Album>();

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    private void OnChanged(NotifyCollectionChangedEventArgs e)
    {
        Application.Current?.Dispatcher.Invoke(DispatcherPriority.Render, 
                     new Action(() => CollectionChanged?.Invoke(this, e)));
    }

    public void Add(Album item)
    {
        _listImplementation.Add(item);
        OnChanged(new NotifyCollectionChangedEventArgs(
                      NotifyCollectionChangedAction.Add, item));
    }

    public bool Remove(Album item)
    {
        int index = _listImplementation.IndexOf(item);
        var removed = index >= 0;
        if (removed)
        {
            _listImplementation.RemoveAt(index);
            OnChanged(new NotifyCollectionChangedEventArgs(
                          NotifyCollectionChangedAction.Remove, item, index));
        }
        return removed;
    }
    // ...snip...
}
3 голосов
/ 17 июня 2013

Пожалуйста, посмотрите на BindableCollection<T> из Caliburn.Micro библиотеки:

/// <summary>
/// A base collection class that supports automatic UI thread marshalling.
/// </summary>
/// <typeparam name="T">The type of elements contained in the collection.</typeparam>
#if !SILVERLIGHT && !WinRT
[Serializable]
#endif
public class BindableCollection<T> : ObservableCollection<T>, IObservableCollection<T> {

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    public BindableCollection() {
        IsNotifying = true;
    }

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    /// <param name = "collection">The collection from which the elements are copied.</param>
    /// <exception cref = "T:System.ArgumentNullException">
    ///   The <paramref name = "collection" /> parameter cannot be null.
    /// </exception>
    public BindableCollection(IEnumerable<T> collection) : base(collection) {
        IsNotifying = true;
    }

#if !SILVERLIGHT && !WinRT
    [field: NonSerialized]
#endif
    bool isNotifying; //serializator try to serialize even autogenerated fields

    /// <summary>
    ///   Enables/Disables property change notification.
    /// </summary>
#if !WinRT
    [Browsable(false)]
#endif
    public bool IsNotifying {
        get { return isNotifying; }
        set { isNotifying = value; }
    }

    /// <summary>
    ///   Notifies subscribers of the property change.
    /// </summary>
    /// <param name = "propertyName">Name of the property.</param>
#if WinRT || NET45
    public virtual void NotifyOfPropertyChange([CallerMemberName]string propertyName = "") {
#else
    public virtual void NotifyOfPropertyChange(string propertyName) {
#endif
        if(IsNotifying)
            Execute.OnUIThread(() => OnPropertyChanged(new PropertyChangedEventArgs(propertyName)));
    }

    /// <summary>
    ///   Raises a change notification indicating that all bindings should be refreshed.
    /// </summary>
    public void Refresh() {
        Execute.OnUIThread(() => {
            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Inserts the item to the specified position.
    /// </summary>
    /// <param name = "index">The index to insert at.</param>
    /// <param name = "item">The item to be inserted.</param>
    protected override sealed void InsertItem(int index, T item) {
        Execute.OnUIThread(() => InsertItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "InsertItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void InsertItemBase(int index, T item) {
        base.InsertItem(index, item);
    }

#if NET || WP8 || WinRT
/// <summary>
/// Moves the item within the collection.
/// </summary>
/// <param name="oldIndex">The old position of the item.</param>
/// <param name="newIndex">The new position of the item.</param>
    protected sealed override void MoveItem(int oldIndex, int newIndex) {
        Execute.OnUIThread(() => MoveItemBase(oldIndex, newIndex));
    }

    /// <summary>
    /// Exposes the base implementation fo the <see cref="MoveItem"/> function.
    /// </summary>
    /// <param name="oldIndex">The old index.</param>
    /// <param name="newIndex">The new index.</param>
    /// <remarks>Used to avoid compiler warning regarding unverificable code.</remarks>
    protected virtual void MoveItemBase(int oldIndex, int newIndex) {
        base.MoveItem(oldIndex, newIndex);
    }
#endif

    /// <summary>
    ///   Sets the item at the specified position.
    /// </summary>
    /// <param name = "index">The index to set the item at.</param>
    /// <param name = "item">The item to set.</param>
    protected override sealed void SetItem(int index, T item) {
        Execute.OnUIThread(() => SetItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "SetItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void SetItemBase(int index, T item) {
        base.SetItem(index, item);
    }

    /// <summary>
    ///   Removes the item at the specified position.
    /// </summary>
    /// <param name = "index">The position used to identify the item to remove.</param>
    protected override sealed void RemoveItem(int index) {
        Execute.OnUIThread(() => RemoveItemBase(index));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "RemoveItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void RemoveItemBase(int index) {
        base.RemoveItem(index);
    }

    /// <summary>
    ///   Clears the items contained by the collection.
    /// </summary>
    protected override sealed void ClearItems() {
        Execute.OnUIThread(ClearItemsBase);
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "ClearItems" /> function.
    /// </summary>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void ClearItemsBase() {
        base.ClearItems();
    }

    /// <summary>
    ///   Raises the <see cref = "E:System.Collections.ObjectModel.ObservableCollection`1.CollectionChanged" /> event with the provided arguments.
    /// </summary>
    /// <param name = "e">Arguments of the event being raised.</param>
    protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e) {
        if (IsNotifying) {
            base.OnCollectionChanged(e);
        }
    }

    /// <summary>
    ///   Raises the PropertyChanged event with the provided arguments.
    /// </summary>
    /// <param name = "e">The event data to report in the event.</param>
    protected override void OnPropertyChanged(PropertyChangedEventArgs e) {
        if (IsNotifying) {
            base.OnPropertyChanged(e);
        }
    }

    /// <summary>
    ///   Adds the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void AddRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            var index = Count;
            foreach(var item in items) {
                InsertItemBase(index, item);
                index++;
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Removes the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void RemoveRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            foreach(var item in items) {
                var index = IndexOf(item);
                if (index >= 0) {
                    RemoveItemBase(index);
                }
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    /// Called when the object is deserialized.
    /// </summary>
    /// <param name="c">The streaming context.</param>
    [OnDeserialized]
    public void OnDeserialized(StreamingContext c) {
        IsNotifying = true;
    }

    /// <summary>
    /// Used to indicate whether or not the IsNotifying property is serialized to Xml.
    /// </summary>
    /// <returns>Whether or not to serialize the IsNotifying property. The default is false.</returns>
    public virtual bool ShouldSerializeIsNotifying() {
        return false;
    }
}

Источник

PS. Просто имейте в виду, что этот класс использует некоторые другие классы из Caliburn.Micro, чтобы вы могли либо копировать / переносить все зависимости самостоятельно, либо - если вы не используете какие-либо другие фреймворки приложений - просто ссылаться на двоичный файл библиотеки и передавать шанс.

0 голосов
/ 30 января 2014

Подробное объяснение и реализация здесь .Он был написан в основном для .NET 3.5 с пакетом обновления 1 (SP1), но все еще будет работать в версии 4.0.

Основная цель этой реализации - когда «реальный» список существует дольше, чем его связываемое представление (например, если этосвязано в окне, которое пользователь может открывать и закрывать).Если время жизни наоборот (например, вы обновляете список из фонового работника, который запускается только при открытом окне), тогда доступны несколько более простых конструкций.

...