Как я могу продублировать различный тип объединения F # в C #? - PullRequest
12 голосов
/ 23 февраля 2010

Я создал новый класс под названием Actor, который обрабатывает передаваемые ему сообщения. Проблема, с которой я сталкиваюсь, состоит в том, чтобы выяснить, какой самый элегантный способ передать связанные, но разные сообщения Актеру. Моя первая идея - использовать наследование, но оно кажется раздутым, но это строго типы, что является обязательным требованием.

Есть идеи?

Пример

private abstract class QueueMessage { }

private class ClearMessage : QueueMessage 
{
    public static readonly ClearMessage Instance = new ClearMessage();

    private ClearMessage() { }
}

private class TryDequeueMessage : QueueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();

    private TryDequeueMessage() { }
}

private class EnqueueMessage : QueueMessage 
{
    public TValue Item { get; private set; }

    private EnqueueMessage(TValue item)
    {
        Item = item;
    }
}

Актерский класс

/// <summary>Represents a callback method to be executed by an Actor.</summary>
/// <typeparam name="TReply">The type of reply.</typeparam>
/// <param name="reply">The reply made by the actor.</param>
public delegate void ActorReplyCallback<TReply>(TReply reply);

/// <summary>Represents an Actor which receives and processes messages in concurrent applications.</summary>
/// <typeparam name="TMessage">The type of message this actor accepts.</typeparam>
/// <typeparam name="TReply">The type of reply made by this actor.</typeparam>
public abstract class Actor<TMessage, TReply> : IDisposable
{
    /// <summary>The default total number of threads to process messages.</summary>
    private const Int32 DefaultThreadCount = 1;


    /// <summary>Used to serialize access to the message queue.</summary>
    private readonly Locker Locker;

    /// <summary>Stores the messages until they can be processed.</summary>
    private readonly System.Collections.Generic.Queue<Message> MessageQueue;

    /// <summary>Signals the actor thread to process a new message.</summary>
    private readonly ManualResetEvent PostEvent;

    /// <summary>This tells the actor thread to stop reading from the queue.</summary>
    private readonly ManualResetEvent DisposeEvent;

    /// <summary>Processes the messages posted to the actor.</summary>
    private readonly List<Thread> ActorThreads;


    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    public Actor() : this(DefaultThreadCount) { }

    /// <summary>Initializes a new instance of the Genex.Concurrency&lt;TRequest, TResponse&gt; class.</summary>
    /// <param name="thread_count"></param>
    public Actor(Int32 thread_count)
    {
        if (thread_count < 1) throw new ArgumentOutOfRangeException("thread_count", thread_count, "Must be 1 or greater.");

        Locker = new Locker();
        MessageQueue = new System.Collections.Generic.Queue<Message>();
        EnqueueEvent = new ManualResetEvent(true);
        PostEvent = new ManualResetEvent(false);
        DisposeEvent = new ManualResetEvent(true);
        ActorThreads = new List<Thread>();

        for (Int32 i = 0; i < thread_count; i++)
        {
            var thread = new Thread(ProcessMessages);
            thread.IsBackground = true;
            thread.Start();
            ActorThreads.Add(thread);
        }
    }


    /// <summary>Posts a message and waits for the reply.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <returns>The reply from the actor.</returns>
    public TReply PostWithReply(TMessage message)
    {
        using (var wrapper = new Message(message))
        {
            lock (Locker) MessageQueue.Enqueue(wrapper);
            PostEvent.Set();
            wrapper.Channel.CompleteEvent.WaitOne();
            return wrapper.Channel.Value;
        }
    }

    /// <summary>Posts a message to the actor and executes the callback when the reply is received.</summary>
    /// <param name="value">The message to post to the actor.</param>
    /// <param name="callback">The callback that will be invoked once the replay is received.</param>
    public void PostWithAsyncReply(TMessage value, ActorReplyCallback<TReply> callback)
    {
        if (callback == null) throw new ArgumentNullException("callback");
        ThreadPool.QueueUserWorkItem(state => callback(PostWithReply(value)));
    }

    /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
    public void Dispose()
    {
        if (DisposeEvent.WaitOne(10))
        {
            DisposeEvent.Reset();
            PostEvent.Set();

            foreach (var thread in ActorThreads)
            {
                thread.Join();
            }

            ((IDisposable)PostEvent).Dispose();
            ((IDisposable)DisposeEvent).Dispose();
        }
    }

    /// <summary>Processes a message posted to the actor.</summary>
    /// <param name="message">The message to be processed.</param>
    protected abstract void ProcessMessage(Message message);

    /// <summary>Dequeues the messages passes them to ProcessMessage.</summary>
    private void ProcessMessages()
    {
        while (PostEvent.WaitOne() && DisposeEvent.WaitOne(10))
        {
            var message = (Message)null;

            while (true)
            {
                lock (Locker)
                {
                    message = MessageQueue.Count > 0 ?
                        MessageQueue.Dequeue() :
                        null;

                    if (message == null)
                    {
                        PostEvent.Reset();
                        break;
                    }
                }

                try
                {
                    ProcessMessage(message);
                }
                catch
                {

                }
            }
        }
    }


    /// <summary>Represents a message that is passed to an actor.</summary>
    protected class Message : IDisposable
    {
        /// <summary>The actual value of this message.</summary>
        public TMessage Value { get; private set; }

        /// <summary>The channel used to give a reply to this message.</summary>
        public Channel Channel { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Message class.</summary>
        /// <param name="value">The actual value of the message.</param>
        public Message(TMessage value)
        {
            Value = value;
            Channel = new Channel();
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            Channel.Dispose();
        }
    }

    /// <summary>Represents a channel used by an actor to reply to a message.</summary>         
    protected class Channel : IDisposable
    {
        /// <summary>The value of the reply.</summary>
        public TReply Value { get; private set; }

        /// <summary>Signifies that the message has been replied to.</summary>
        public ManualResetEvent CompleteEvent { get; private set; }


        /// <summary>Initializes a new instance of Genex.Concurrency.Channel class.</summary>
        public Channel()
        {
            CompleteEvent = new ManualResetEvent(false);
        }

        /// <summary>Reply to the message received.</summary>
        /// <param name="value">The value of the reply.</param>
        public void Reply(TReply value)
        {
            Value = value;
            CompleteEvent.Set();
        }

        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
            ((IDisposable)CompleteEvent).Dispose();
        }
    }
}

Ответы [ 6 ]

12 голосов
/ 24 февраля 2010

Стив Гилхэм суммировал, как компилятор на самом деле обрабатывает различимые объединения. Для вашего собственного кода вы могли бы рассмотреть упрощенную версию этого. Учитывая следующее F #:

type QueueMessage<T> = ClearMessage | TryDequeueMessage | EnqueueMessage of T

Вот один из способов эмулировать его в C #:

public enum MessageType { ClearMessage, TryDequeueMessage, EnqueueMessage }

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract MessageType MessageType { get; }

    /// <summary>
    /// Only applies to EnqueueMessages
    /// </summary>
    public abstract T Item { get; }

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }


    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.ClearMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override MessageType MessageType
        {
            get { return MessageType.TryDequeueMessage; }
        }

        /// <summary>
        /// Not implemented by this subclass
        /// </summary>
        public override T Item
        {
            get { throw new NotImplementedException(); }
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override MessageType MessageType
        {
            get { return MessageType.EnqueueMessage; }
        }

        /// <summary>
        /// Gets the item to be enqueued
        /// </summary>
        public override T Item { get { return item; } }
    }
}

Теперь в коде, который получает QueueMessage, вы можете включить свойство MessageType вместо сопоставления с шаблоном и убедиться, что вы получаете доступ к свойству Item только в EnqueueMessage s.

EDIT

Вот еще одна альтернатива, основанная на коде Джульетты. Я попытался упростить процесс, чтобы у него был более удобный интерфейс из C #. Это предпочтительнее предыдущей версии, поскольку вы не можете получить исключение MethodNotImplemented.

public abstract class QueueMessage<T>
{
    // prevents unwanted subclassing
    private QueueMessage() { }

    public abstract TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase);

    public static QueueMessage<T> MakeClearMessage() { return new ClearMessage(); }
    public static QueueMessage<T> MakeTryDequeueMessage() { return new TryDequeueMessage(); }
    public static QueueMessage<T> MakeEnqueueMessage(T item) { return new EnqueueMessage(item); }

    private sealed class ClearMessage : QueueMessage<T>
    {
        public ClearMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return clearCase();
        }
    }

    private sealed class TryDequeueMessage : QueueMessage<T>
    {
        public TryDequeueMessage() { }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return tryDequeueCase();
        }
    }

    private sealed class EnqueueMessage : QueueMessage<T>
    {
        private T item;
        public EnqueueMessage(T item) { this.item = item; }

        public override TReturn Match<TReturn>(Func<TReturn> clearCase, Func<TReturn> tryDequeueCase, Func<T, TReturn> enqueueCase)
        {
            return enqueueCase(item);
        }
    }
}

Вы использовали бы этот код так:

public class MessageUserTest
{
    public void Use()
    {
        // your code to get a message here...
        QueueMessage<string> msg = null; 

        // emulate pattern matching, but without constructor names
        int i =
            msg.Match(
                clearCase:      () => -1,
                tryDequeueCase: () => -2,
                enqueueCase:     s =>  s.Length);
    }
}
6 голосов
/ 24 февраля 2010

В вашем примере кода вы реализуете PostWithAsyncReply в терминах PostWithReply. Это не идеально, потому что это означает, что когда вы вызываете PostWithAsyncReply, и субъекту требуется некоторое время, чтобы обработать его, на самом деле связаны два потока: один исполняющий субъект и второй ожидающий его завершения. Было бы лучше иметь один поток, выполняющий актер, а затем вызывать обратный вызов в асинхронном случае. (Очевидно, что в синхронном случае нельзя избежать связывания двух потоков).

Обновление:

Подробнее об этом: вы создаете актера с аргументом, указывающим, сколько потоков запустить. Для простоты предположим, что каждый актер работает с одним потоком (на самом деле это довольно хорошая ситуация, поскольку у актеров может быть внутреннее состояние без блокировки, так как только один поток обращается к нему напрямую).

Актер A вызывает актера B, ожидая ответа. Чтобы обработать запрос, субъект B должен вызвать субъекта C. Таким образом, теперь только потоки A и B ожидают, а C - единственный, который фактически дает процессору какую-либо работу. Так много для многопоточности! Но это то, что вы получите, если будете постоянно ждать ответов.

Хорошо, вы можете увеличить количество потоков, которые вы запускаете в каждом актере. Но вы начинаете их, чтобы они могли сидеть и ничего не делать. Стек занимает много памяти, а переключение контекста может быть дорогим.

Так что лучше отправлять сообщения асинхронно с механизмом обратного вызова, чтобы вы могли получить готовый результат. Проблема с вашей реализацией заключается в том, что вы извлекаете другой поток из пула потоков, просто чтобы сидеть и ждать. Таким образом, вы в основном применяете обходной путь увеличения количества потоков. Вы выделяете поток для задачи , которая никогда не выполняется .

Было бы лучше реализовать PostWithReply в терминах PostWithAsyncReply, то есть наоборот. Асинхронная версия является низкоуровневой. Основываясь на моем примере на основе делегатов (потому что он требует меньше ввода кода!):

private bool InsertCoinImpl(int value) 
{
    // only accept dimes/10p/whatever it is in euros
    return (value == 10);
}

public void InsertCoin(int value, Action<bool> accepted)
{
    Submit(() => accepted(InsertCoinImpl(value)));
}

Так что частная реализация возвращает bool. Открытый асинхронный метод принимает действие, которое получит возвращаемое значение; как частная реализация, так и действие обратного вызова выполняются в одном потоке.

Надеюсь, необходимость синхронного ожидания будет делом меньшинства. Но когда вам это нужно, он может быть предоставлен вспомогательным методом, полностью общего назначения и не привязанным к какому-либо конкретному субъекту или типу сообщения:

public static T Wait<T>(Action<Action<T>> activity)
{
    T result = default(T);
    var finished = new EventWaitHandle(false, EventResetMode.AutoReset);

    activity(r =>
        {
            result = r;
            finished.Set();
        });

    finished.WaitOne();
    return result;
}

Так что теперь в каком-то другом актере мы можем сказать:

bool accepted = Helpers.Wait<bool>(r => chocMachine.InsertCoin(5, r));

Аргумент типа для Wait может быть ненужным, не пытался скомпилировать ничего из этого. Но Wait в основном фокусирует обратный вызов для вас, так что вы можете передать его какому-то асинхронному методу, и снаружи вы просто возвращаете все, что было передано в обратный вызов, в качестве возвращаемого значения. Обратите внимание, что лямбда, которую вы передаете Wait, все еще фактически выполняется в том же потоке, который вызвал Wait.

Теперь мы вернемся к нашей обычной программе ...

Что касается фактической проблемы, о которой вы спрашивали, вы отправляете актеру сообщение, чтобы заставить его что-то сделать. Делегаты полезны здесь. Они позволяют эффективно заставить компилятор генерировать вам класс с некоторыми данными, конструктор, который вам даже не требуется вызывать явно, а также метод. Если вам нужно написать кучу маленьких классов, переключитесь на делегатов.

abstract class Actor
{
    Queue<Action> _messages = new Queue<Action>();

    protected void Submit(Action action)
    {
        // take out a lock of course
        _messages.Enqueue(action);
    }

    // also a "run" that reads and executes the 
    // message delegates on background threads
}

Теперь конкретный производный актер следует этому шаблону:

class ChocolateMachineActor : Actor
{
    private void InsertCoinImpl(int value) 
    {
        // whatever...
    }

    public void InsertCoin(int value)
    {
        Submit(() => InsertCoinImpl(value));
    }
}

Итак, чтобы отправить сообщение актеру, вы просто вызываете публичные методы. Частный метод Impl делает настоящую работу. Не нужно писать кучу классов сообщений вручную.

Очевидно, я упустил материал об ответах, но это можно сделать с помощью большего количества параметров. (См. Обновление выше).

6 голосов
/ 23 февраля 2010

Типы объединения и сопоставления с шаблоном сопоставляются непосредственно с шаблоном посетителя. Я уже писал об этом несколько раз:

Так что, если вы хотите передавать сообщения с множеством разных типов, вы застряли в реализации шаблона посетителя.

(Предупреждение, непроверенный код впереди, но он должен дать вам представление о том, как это делается)

Допустим, у нас есть что-то вроде этого:

type msg =
    | Add of int
    | Sub of int
    | Query of ReplyChannel<int>


let rec counts = function
    | [] -> (0, 0, 0)
    | Add(_)::xs -> let (a, b, c) = counts xs in (a + 1, b, c)
    | Sub(_)::xs -> let (a, b, c) = counts xs in (a, b + 1, c)
    | Query(_)::xs -> let (a, b, c) = counts xs in (a, b, c + 1)

Вы получите этот громоздкий код C #:

interface IMsgVisitor<T>
{
    T Visit(Add msg);
    T Visit(Sub msg);
    T Visit(Query msg);
}

abstract class Msg
{
    public abstract T Accept<T>(IMsgVistor<T> visitor)
}

class Add : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Sub : Msg
{
    public readonly int Value;
    public Add(int value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

class Query : Msg
{
    public readonly ReplyChannel<int> Value;
    public Add(ReplyChannel<int> value) { this.Value = value; }
    public override T Accept<T>(IMsgVisitor<T> visitor) { return visitor.Visit(this); }
}

Теперь, когда вы хотите что-то сделать с сообщением, вам нужно внедрить посетителя:

class MsgTypeCounter : IMsgVisitor<MsgTypeCounter>
{
    public readonly Tuple<int, int, int> State;    

    public MsgTypeCounter(Tuple<int, int, int> state) { this.State = state; }

    public MsgTypeCounter Visit(Add msg)
    {
        Console.WriteLine("got Add of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(1 + State.Item1, State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Sub msg)
    {
        Console.WriteLine("got Sub of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }

    public MsgTypeCounter Visit(Query msg)
    {
        Console.WriteLine("got Query of " + msg.Value);
        return new MsgTypeCounter(Tuple.Create(State.Item1, 1 + State.Item2, State.Item3));
    }
}

Тогда, наконец, вы можете использовать его так:

var msgs = new Msg[] { new Add(1), new Add(3), new Sub(4), new ReplyChannel(null) };
var counts = msgs.Aggregate(new MsgTypeVisitor(Tuple.Create(0, 0, 0)),
    (acc, x) => x.Accept(acc)).State;

Да, это так глупо, как кажется, но именно так вы передаете несколько сообщений классу безопасным для типов способом, и именно поэтому мы не реализуем объединения в C #;)

2 голосов
/ 23 февраля 2010

Если у вас есть

type internal Either<'a, 'b> =
  | Left of 'a
  | Right of 'b

в F #, тогда эквивалент CR CLR, сгенерированного для класса Either<'a, 'b>, имеет внутренние типы, такие как

internal  class _Left : Either<a, b>
{
     internal readonly a left1;
     internal _Left(a left1);
}

каждый с тегом, геттером и фабричным методом

internal const  int tag_Left = 0;
internal static  Either<a, b> Left(a Left1);
internal a Left1 {  get; }

плюс дискриминатор

internal int  Tag { get; }

и множество методов для реализации интерфейсов IStructuralEquatable, IComparable, IStructuralComparable

2 голосов
/ 23 февраля 2010

Длинный выстрел, но все равно ..

Я предполагаю, что дискриминированное объединение - это F # для ADT (абстрактный тип данных). Это означает, что тип может быть одним из нескольких.

Если их два, вы можете попробовать поместить их в простой обобщенный класс с двумя параметрами типа:

 public struct DiscriminatedUnion<T1,T2>
 {   
      public DiscriminatedUnion(T1 t1) { value = t1; }
      public DiscriminatedUnion(T2 t1) { value = t2; }


      public static implicit operator T1(DiscriminatedUnion<T1,T2> du) {return (T1)du.value; }
      public static implicit operator T2(DiscriminatedUnion<T1,T2> du) {return (T2)du.value; }

      object value;
 }

Чтобы он работал 3 или более раз, нам нужно повторить этот класс несколько раз. У любого есть решение для перегрузки функции в зависимости от типа времени выполнения?

0 голосов
/ 05 сентября 2011

В C #

имеется дискриминированный тип объединения, проверенный во время компиляции
private class ClearMessage
{
    public static readonly ClearMessage Instance = new ClearMessage();    
    private ClearMessage() { }
}

private class TryDequeueMessage 
{
    public static readonly TryDequeueMessage Instance = new TryDequeueMessage();    
    private TryDequeueMessage() { }
}

private class EnqueueMessage
{
    public TValue Item { get; private set; }    
    private EnqueueMessage(TValue item) { Item = item; }
}

Использование дискриминационного союза может быть сделано следующим образом:

// New file
// Create an alias
using Message = Union<ClearMessage, TryDequeueMessage, EnqueMessage>;

int ProcessMessage(Message msg)
{
   return Message.Match(
      clear => 1,
      dequeue => 2,
      enqueue => 3);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...