Отслеживание WeakReference на объекты из нескольких потоков - PullRequest
1 голос
/ 12 января 2012

Я проектирую статическую шину сообщений, которая позволяла бы подписываться и публиковать сообщения произвольного типа.Чтобы не требовать от наблюдателей явной отписки, я бы хотел отслеживать объекты WeakReference, которые указывают на делегатов, а не на отслеживание самих делегатов.Я закончил тем, что кодировал нечто похожее на то, что Пол Стовелл описал в своем блоге http://www.paulstovell.com/weakevents.

Моя проблема заключается в следующем: в отличие от кода Пола, мои обозреватели подписываются на сообщения в одной теме, но сообщения могут публиковаться в другой,В этом случае я замечаю, что к тому времени, когда мне нужно уведомить наблюдателей, мои значения WeakReference.Target будут нулевыми, что указывает на то, что цели были собраны, хотя я точно знаю, что это не так.Проблема сохраняется как для коротких, так и для длинных слабых ссылок.

И наоборот, когда подписка и публикация выполняются из одного потока, код работает нормально.Последнее верно, даже если я на самом деле заканчиваю перечислять цели в новом потоке из ThreadPool, пока запрос изначально приходит из того же потока, на который я подписываюсь на сообщения.

Я понимаю, что этоочень специфический случай, поэтому любая помощь очень ценится.

Мой вопрос: не должен ли я иметь возможность надежного доступа к объектам WeakReference из нескольких потоков при условии правильной синхронизации потоков?Похоже, что я не могу, что не имеет большого смысла для меня.Итак, что я не делаю правильно?

Ответы [ 2 ]

0 голосов
/ 12 января 2012

Это поправка к моему предыдущему ответу. Я обнаружил, почему мой оригинальный код не работал, и эта информация может быть полезна для других. В моем исходном коде MessageBus был создан как Singleton:

public class MessageBus : Singleton<MessageBus> // Singleton<> is my library class

В приведенном выше примере он был объявлен как статический:

public static class MessageBus

Как только я преобразовал свой код в статический, все стало работать. Сказав это, я еще не мог понять, почему синглтон не работает.

0 голосов
/ 12 января 2012

Похоже, после сокращения моего кода до более простой формы (см. Ниже), теперь он работает нормально. Это означает, что проблема, которая привела к тому, что слабые эталонные цели собираются слишком рано, должна находиться в другом месте моего кода. Итак, чтобы ответить на мой собственный вопрос, кажется, что слабые ссылки могут быть безопасно доступны из нескольких потоков.

Вот мой тестовый код:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;


namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Starting the app");
            Test test = new Test();
            // uncomment these lines to cause automatic unsubscription from Message1
//          test = null;
//          GC.Collect();
//          GC.WaitForPendingFinalizers();

            // publish Message1 on this thread
            // MessageBus.Publish<Message1>(new Message1());

            // publish Message1 on another thread
            ThreadPool.QueueUserWorkItem(delegate
            {
                MessageBus.Publish<Message1>(new Message1());
            });


            while (!MessageBus.IamDone)
            {
                Thread.Sleep(100);
            }
            Console.WriteLine("Exiting the app");
            Console.WriteLine("Press <ENTER> to terminate program.");
            Console.WriteLine();
            Console.ReadLine();
        }
    }

    public class Test
    {
        public Test()
        {
            Console.WriteLine("Subscribing to message 1.");
            MessageBus.Subscribe<Message1>(OnMessage1);
            Console.WriteLine("Subscribing to message 2.");
            MessageBus.Subscribe<Message2>(OnMessage2);
        }

        public void OnMessage1(Message1 message)
        {
            Console.WriteLine("Got message 1. Publishing message 2");
            MessageBus.Publish<Message2>(new Message2());
        }

        public void OnMessage2(Message2 message)
        {
            Console.WriteLine("Got message 2. Closing the app");
            MessageBus.IamDone = true;
        }
    }

    public abstract class MessageBase
    {
        public string Message;
    }

    public class Message1 : MessageBase
    {
    }

    public class Message2 : MessageBase
    {
    }

    public static class MessageBus
    {
        // This is here purely for this test
        public static bool IamDone = false;
        /////////////////////////////////////

        /// <summary>
        /// A dictionary of lists of handlers of messages by message type
        /// </summary>
        private static ConcurrentDictionary<string, List<WeakReference>> handlersDict = new ConcurrentDictionary<string, List<WeakReference>>();

        /// <summary>
        /// Thread synchronization object to use with Publish calls
        /// </summary>
        private static object _lockPublishing = new object();

        /// <summary>
        /// Thread synchronization object to use with Subscribe calls
        /// </summary>
        private static object _lockSubscribing = new object();

        /// <summary>
        /// Creates a work queue item that encapsulates the provided parameterized message
        /// and dispatches it.
        /// </summary>
        /// <typeparam name="TMessage">Message argument type</typeparam>
        /// <param name="message">Message argument</param>
        public static void Publish<TMessage>(TMessage message)
            where TMessage : MessageBase
        {
            // create the dictionary key
            string key = String.Empty;
            key = typeof(TMessage).ToString();
            // initialize a queue work item argument as a tuple of the dictionary type key and the message argument
            Tuple<string, TMessage, Exception> argument = new Tuple<string, TMessage, Exception>(key, message, null);
            // push the message on the worker queue
            ThreadPool.QueueUserWorkItem(new WaitCallback(_PublishMessage<TMessage>), argument);
        }

        /// <summary>
        /// Publishes a message to the bus, causing observers to be invoked if appropriate.
        /// </summary>
        /// <typeparam name="TArg">Message argument type</typeparam>
        /// <param name="stateInfo">Queue work item argument</param>
        private static void _PublishMessage<TArg>(Object stateInfo)
            where TArg : class
        {
            try
            {
                // translate the queue work item argument to extract the message type info and
                // any arguments
                Tuple<string, TArg, Exception> arg = (Tuple<string, TArg, Exception>)stateInfo;
                // call all observers that have registered to receive this message type in parallel
                Parallel.ForEach(handlersDict.Keys
                    // find the right dictionary list entry by message type identifier
                    .Where(handlerKey => handlerKey == arg.Item1)
                    // dereference the list entry by message type identifier to get a reference to the observer
                    .Select(handlerKey => handlersDict[handlerKey]), (handlerList, state) =>
                    {
                        lock (_lockPublishing)
                        {
                            List<int> descopedRefIndexes = new List<int>(handlerList.Count);
                            // search the list of references and invoke registered observers
                            foreach (WeakReference weakRef in handlerList)
                            {
                                // try to obtain a strong reference to the target
                                Delegate dlgRef = (weakRef.Target as Delegate);
                                // check if the underlying delegate reference is still valid
                                if (dlgRef != null)
                                {
                                    // yes it is, get the delegate reference via Target property, convert it to Action and invoke the observer
                                    try
                                    {
                                        (dlgRef as Action<TArg>).Invoke(arg.Item2);
                                    }
                                    catch (Exception e)
                                    {
                                        // trouble invoking the target observer's reference, mark it for deletion
                                        descopedRefIndexes.Add(handlerList.IndexOf(weakRef));
                                        Console.WriteLine(String.Format("Error looking up target reference: {0}", e.Message));
                                    }
                                }
                                else
                                {
                                    // the target observer's reference has been descoped, mark it for deletion
                                    descopedRefIndexes.Add(handlerList.IndexOf(weakRef));
                                    Console.WriteLine(String.Format("Message type \"{0}\" has been unsubscribed from.", arg.Item1));
                                    MessageBus.IamDone = true;
                                }
                            }
                            // remove any descoped references
                            descopedRefIndexes.ForEach(index => handlerList.RemoveAt(index));
                        }
                    });
            }
            // catch all Exceptions
            catch (AggregateException e)
            {
                Console.WriteLine(String.Format("Error dispatching messages: {0}", e.Message));
            }
        }

        /// <summary>
        /// Subscribes the specified delegate to handle messages of type TMessage
        /// </summary>
        /// <typeparam name="TArg">Message argument type</typeparam>
        /// <param name="action">WeakReference that represents the handler for this message type to be registered with the bus</param>
        public static void Subscribe<TArg>(Action<TArg> action)
            where TArg : class
        {
            // validate input
            if (action == null)
                throw new ArgumentNullException(String.Format("Error subscribing to message type \"{0}\": Specified action reference is null.", typeof(TArg)));
            // build the queue work item key identifier
            string key = typeof(TArg).ToString();
            // check if a message of this type was already added to the bus
            if (!handlersDict.ContainsKey(key))
            {
                // no, it was not, create a new dictionary entry and add the new observer's reference to it
                List<WeakReference> newHandlerList = new List<WeakReference>();
                handlersDict.TryAdd(key, newHandlerList);
            }
            lock (_lockSubscribing)
            {
                // append this new observer's reference to the list, if it does not exist already
                if (!handlersDict[key].Any(existing => (existing.Target as Delegate) != null && (existing.Target as Delegate).Equals(action)))
                {
                    // append the new reference
                    handlersDict[key].Add(new WeakReference(action, true));
                }
            }
        }
    }
}
...