Как публиковать и подписываться на сообщения класса Message с помощью WCF - PullRequest
2 голосов
/ 03 июня 2011

У меня запущена небольшая служба публикации / поддержки WCF, а удаленные клиенты подписываются и отправляют сообщения (пробовал все виды сложных объектов), и это прекрасно работает.Все интерфейсы отражают (редактируют) тип используемого объекта.Переключение на другой тип объекта требует, чтобы интерфейсы были настроены для соответствия этому типу объекта.Все подписчики получают копию сообщения.

Теперь я пытаюсь сделать то же самое, но с сообщениями класса Message.Клиент создает новое сообщение, инкапсулирует свой объект в сообщение и отправляет его (удаленной) службе, где оно получено надлежащим образом (проверено объектом).Однако, когда сервер отвечает отправкой (обратным вызовом) сообщения обратно исходному клиенту, клиент получает следующее сообщение:

«Сервер не предоставил значимого ответа;это может быть вызвано несоответствием контракта, преждевременным завершением сеанса или внутренней ошибкой сервера. »

Последовательность событий (клиент):

Клиент создает сообщение, (DuplexChannelFactory) AddMessage, -CatchВышеуказанная ошибка

Последовательность событий (сервер):

Хост службы получает сообщение, Сообщение проверено (копировать и создать заново), Выполнить обратный вызов, Нет ошибок.

Переключение обратно набазовый или определяемый пользователем тип, и все проблемы исчезают.Я боролся с этим уже неделю и не приблизился ни к какому решению.Пробовал манипулировать заголовками, воссоздавать сообщение, переключаться на контракты сообщений и пытаться интерпретировать содержимое журналов трассировки и т. Д. Надеюсь, я найду здесь некоторые ответы.

Основной используемый код (лишен большей части обработки ошибок)):

Интерфейсы клиента:

namespace WCFSQL
{
    public class ClientInterfaces
    {

        [ServiceContract(Namespace = "WCFServer", Name = "CallBacks")]
        public interface IMessageCallback
        {
            [OperationContract(Name = "OnMessageAdded", Action = "WCFServer/IMessageCallback/OnMessageAdded", IsOneWay = true)] 
            void OnMessageAdded(Message SQLMessage, DateTime timestamp);
        }

        [ServiceContract(Namespace = "WCFServer", CallbackContract = typeof(IMessageCallback))] 
        public interface IMessage
        {
            [OperationContract(Name = "AddMessage", Action = "WCFServer/IMessage/AddMessage")]
            void AddMessage(Message SQLMessage);

            [OperationContract(Name = "Subscribe", Action = "WCFServer/IMessage/Subscribe")]
            bool Subscribe();

            [OperationContract(Name = "Unsubscribe", Action = "WCFServer/IMessage/Unsubscribe")]
            bool Unsubscribe();
        }
    }
}

Интерфейсы сервера:

namespace WCFSQL
{
    public class ServerInterfaces
    {
        [ServiceContract(Namespace = "WCFServer")]
        public interface IMessageCallback
        {
            [OperationContract(Name = "OnMessageAdded", Action = "WCFServer/IMessageCallback/OnMessageAdded", IsOneWay = true)]
            void OnMessageAdded(Message SQLMessage, DateTime timestamp); 
        }

        [ServiceContract(Namespace = "WCFServer", CallbackContract = typeof(IMessageCallback), SessionMode = SessionMode.Required)]
        public interface IMessage
        {
            [OperationContract(Name = "AddMessage", Action = "WCFServer/IMessage/AddMessage")]
            void AddMessage(Message SQLMessage);

            [OperationContract(Name = "Subscribe", Action = "WCFServer/IMessage/Subscribe")]
            bool Subscribe();

            [OperationContract(Name = "Unsubscribe", Action = "WCFServer/IMessage/Unsubscribe")]
            bool Unsubscribe();
        }
    }
}

Создание сообщения:

// client proxy instance created and opened before

    public static bool WCFSqlLogger(string Program, WCFSQLErrorLogMessage SQLErrorMessage, WCFSqlClientProxy client)
    {
        MessageVersion ver = MessageVersion.CreateVersion(EnvelopeVersion.Soap12, AddressingVersion.WSAddressing10);

        Message Out = Message.CreateMessage(ver, "WCFServer/IMessage/AddMessage", SQLErrorMessage); 

        if (!client.SendMessage(Out))
        {
            Console.WriteLine("Client Main: Unable to send");
            return false;
        }
        return true;
    }

Прокси клиента:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, IncludeExceptionDetailInFaults = true)]
[CallbackBehavior(IncludeExceptionDetailInFaults = true, ConcurrencyMode = ConcurrencyMode.Single, UseSynchronizationContext = false)] 
public class WCFSqlClientProxy : ClientInterfaces.IMessageCallback, IDisposable
{
    public ClientInterfaces.IMessage pipeProxy = null;
    DuplexChannelFactory<ClientInterfaces.IMessage> pipeFactory;

    public bool Connect()
    {
        NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.TransportWithMessageCredential);// NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.Transport)
        newBinding.Security.Message.ClientCredentialType = MessageCredentialType.Certificate;
        EndpointAddress newEndpoint = new EndpointAddress(new Uri("net.tcp://host:8000/ISubscribe"), EndpointIdentity.CreateDnsIdentity("Domain")); 

        pipeFactory = new DuplexChannelFactory<ClientInterfaces.IMessage>(new InstanceContext(this), newBinding, newEndpoint); 

        pipeFactory.Credentials.Peer.PeerAuthentication.CertificateValidationMode = X509CertificateValidationMode.PeerOrChainTrust;
        pipeFactory.Credentials.ServiceCertificate.Authentication.RevocationMode = X509RevocationMode.NoCheck; 
        pipeFactory.Credentials.ClientCertificate.SetCertificate(StoreLocation.LocalMachine, StoreName.TrustedPeople, X509FindType.FindByThumbprint, "somestring");

        try
        {
            pipeProxy = pipeFactory.CreateChannel();
            pipeProxy.Subscribe();
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine("Error opening: {0}", e.Message);
            return false;
        }
    }
    public void Close()
    {
        pipeProxy.Unsubscribe();
    }

    public bool SendMessage(Message SQLMessage)
    {
        try
        {
            Console.WriteLine("Proxy Sending:");
            pipeProxy.AddMessage(SQLMessage); // This is where the eror occurs !!!!!!!!!!!!!!!!!!
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine("Client Proxy: Error sending: {0}", e.Message); 
        }
        return false;
    }

    public void OnMessageAdded(Message SQLMessage, DateTime timestamp) 
    {
        WCFSQLErrorLogMessage message = SQLMessage.GetBody<WCFSQLErrorLogMessage>();
        Console.WriteLine(message.LogProgram + ": " + timestamp.ToString("hh:mm:ss"));
    }

    public void Dispose()
    {
        Console.WriteLine("Dispose: Unsubscribe");
        pipeProxy.Unsubscribe();
    }
}

Сервис:

namespace WCFSQL
{

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, IncludeExceptionDetailInFaults = true)]
[CallbackBehavior(IncludeExceptionDetailInFaults = true, ConcurrencyMode = ConcurrencyMode.Single, UseSynchronizationContext = false)] // or ConcurrencyMode.Reentrant
public class WCFSqlServerProxy : ServerInterfaces.IMessage
{
    private static List<ServerInterfaces.IMessageCallback> subscribers = new List<ServerInterfaces.IMessageCallback>(); 
    private static Uri target;
    private static ServiceHost serviceHost;

    public WCFSqlServerProxy(Uri Target) // Singleton
    {
        target = Target;
    }

    public bool Connect()
    {
        serviceHost = new ServiceHost(typeof(WCFSqlServerProxy), target);
        NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.TransportWithMessageCredential);
        newBinding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Certificate; 
        newBinding.Security.Message.ClientCredentialType = MessageCredentialType.Certificate; 
        serviceHost.Credentials.ClientCertificate.Authentication.RevocationMode = X509RevocationMode.NoCheck; // Non-domain members cannot follow the chain?
        serviceHost.Credentials.ServiceCertificate.SetCertificate(StoreLocation.LocalMachine, StoreName.TrustedPeople, X509FindType.FindByThumbprint, "somestring");
        serviceHost.Credentials.ClientCertificate.Authentication.CertificateValidationMode = X509CertificateValidationMode.PeerOrChainTrust;
        serviceHost.AddServiceEndpoint(typeof(ServerInterfaces.IMessage), newBinding, "ISubscribe");

        return true;
    }

    public bool Open()
    {
        serviceHost.Open();
    return true;
    }

    public bool Close()
    {
        serviceHost.Close();
        return true;
    }

    public bool Subscribe()
    {
        try
        {
            ServerInterfaces.IMessageCallback callback = OperationContext.Current.GetCallbackChannel<ServerInterfaces.IMessageCallback>();
            if (!subscribers.Contains(callback))
            {
                subscribers.Add(callback);
                return true;
            }
            else
            {
                return false;
            }
        }
        catch (Exception e)
        {
            return false;
        }
    }

    public bool Unsubscribe()
    {
        try
        {
            ServerInterfaces.IMessageCallback callback = OperationContext.Current.GetCallbackChannel<ServerInterfaces.IMessageCallback>(); 
            if (subscribers.Contains(callback))
            {
                subscribers.Remove(callback);
                return true;
            }
            return false;
        }
        catch (Exception e)
        {
            Console.WriteLine("WCFSqlServerProxy: Unsubscribe - Unsubscribe error {0}", e);
            return false;
        }
    }

    private string GetData()
    {
        MessageProperties messageProperties = ((OperationContext)OperationContext.Current).IncomingMessageProperties;
        RemoteEndpointMessageProperty endpointProperty = messageProperties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
        string computerName = null;
        try
        {
            string[] computer_name = Dns.GetHostEntry(endpointProperty.Address).HostName.Split(new Char[] { '.' });
            computerName = computer_name[0].ToString();
        }
        catch (Exception e)
        {
            computerName = "NOTFOUND";
            Console.WriteLine("WCFSqlServerProxy: Hostname error:  {0}", e);
        }
        return string.Format("{0} - {1}:{2}", computerName, endpointProperty.Address, endpointProperty.Port);
    }


    public void AddMessage(Message SQLMessage) //Go through the list of connections and call their callback funciton
    {
        subscribers.ForEach(delegate(ServerInterfaces.IMessageCallback callback)
        {
            if (((ICommunicationObject)callback).State == CommunicationState.Opened)
            {
                MessageVersion ver = MessageVersion.CreateVersion(EnvelopeVersion.Soap12, AddressingVersion.WSAddressing10);
                MessageBuffer buffer = SQLMessage.CreateBufferedCopy(4096);

                Message msgCopy = buffer.CreateMessage();
                //System.Xml.XmlDictionaryReader xrdr = msgCopy.GetReaderAtBodyContents();

                WCFSQLErrorLogMessage p = msgCopy.GetBody<WCFSQLErrorLogMessage>();

                SQLMessage = buffer.CreateMessage();
                buffer.Close();

                Message In = Message.CreateMessage(ver, "WCFServer/IMessage/AddMessage", p); // Tried recreating messsage, with same results

                //Console.WriteLine("Message: Header To:      {0}", In.Headers.To);
                //Console.WriteLine("Message: Header From:    {0}", In.Headers.From);
                //Console.WriteLine("Message: Header Action:  {0}", In.Headers.Action);
                //Console.WriteLine("Message: Header ReplyTo: {0}", In.Headers.ReplyTo);
                //Console.WriteLine("Message: IsFault:        {0}", In.IsFault);
                //Console.WriteLine("Message: Properties      {0}", In.Properties);
                //Console.WriteLine("Message: State           {0}", In.State);
                //Console.WriteLine("Message: Type            {0}", In.GetType());
                //Console.WriteLine("Proxy Sending: Copy created");

                //Console.WriteLine("Remote:  {0}, Hash: {1}", GetData(), callback.GetHashCode());

                callback.OnMessageAdded(SQLMessage, DateTime.Now); // This should echo the message back with a timeslot.
            }
            else
            {
                Console.WriteLine("WCFSqlServerProxy:addmessage connected state: {0}", ((ICommunicationObject)callback).State == CommunicationState.Opened);
                subscribers.Remove(callback);
            }
        });
    }
}

1 Ответ

2 голосов
/ 06 июня 2011

Я только что получил ответ на свой вопрос от Танвира Худа, на форуме Microsoft WCF.

«Использование класса сообщений в операциях»

Вы можете использоватьКласс сообщения как входной параметр операции, возвращаемое значение операции или оба. Если сообщение используется где-либо в операции, применяются следующие ограничения:

• Операция не может иметь никаких параметров или других.

• Не может быть более одного входного параметра. Если параметр присутствует, это должен быть либо тип сообщения, либо тип контракта сообщения.

• тип возвращаемого значения: void, Message или тип контракта сообщения.«.

Я не могу поверить, я пропустил это;Должен был прочитать его как минимум три раза, но никогда не применял эти правила к обратному вызову.Обратный вызов в моих описанных интерфейсах действительно имеет тип возврата void, но у него есть Message и параметр DateTime.

После удаления параметра DateTime обратный вызов (попытался) повторно сериализовать исходное Сообщение, но не удалось из-за недопустимого действия (действие все еще было задано для AddMessage, в то время как теперь оно должно быть OnMessageAdded).После изменения действия на обратный вызов на Action = "*" оно заработало отлично.A (возможно, раздражает) подробно излагаю, что мне действительно не требуется тип сообщения при обратном вызове, но я был очень расстроен, что не смог заставить его работать

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...