Я написал агент транспорта Exchange, который загружает текст сообщения электронной почты в службу WCF. Служба находится в том же окне, что и Exchange, и прослушивает localhost: 1530 для получения входящих TCP-соединений. Транспортный агент реализован с помощью .NET 3.5 Framework, а служба размещается в службе Windows, реализованной на платформе .NET 4.0.
Мой вопрос: почему поток завершается до завершения чтения и как я могу это исправить?
Договор на обслуживание определяется следующим образом:
[ServiceContract]
public interface IReceiveService
{
[OperationContract]
Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);
[OperationContract]
void ImportMessageBody(IMessageBody body);
[OperationContract]
void ImportMessageAttachment(IMessageAttachment attachment);
}
Обновление : я переупорядочил это, чтобы кому-то было проще быстро прочитать проблему, не обязательно читая остальную часть моего описания, которое длинное. Первый бит, показывающий, как я запускаю Task
для обработки запросов, кажется проблемой. Действительно, если я закомментирую часть Task.Factory.StartNew()
, Stream.CopyTo
работает.
В своей реализации сервиса я пытаюсь использовать Stream.CopyTo
для копирования входящего потока во временный файл, например так:
public void ImportMessageBody(IMessageBody body)
{
Task.Factory.StartNew(() =>
{
string fileName = GetFileNameFromMagicalPlace();
using (FileStream fs = new FileStream(fileName, FileMode.Append))
{
body.Data.CopyTo(fs); // <---- throws System.IOException right here
}
});
}
Ошибка исключения: «ошибка: возникла исключительная ситуация при чтении потока». Трассировка стека:
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.CopyTo...
Существует внутреннее исключение:
System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope.
at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
at System.Xml.XmlBufferReader.GetByteHard()
at System.Xml.XmlBufferReader.ReadMultiByteUInt31()
at System.Xml.XmlBinaryReader.ReadName(StringHandle handle)
at System.Xml.XmlBinaryReader.ReadNode()
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
Другие подробности следуют.
IMessageBody
определяется следующим образом:
[MessageContract]
public class IMessageBody
{
[MessageHeader]
public Guid MessageID { get; set; }
[MessageHeader]
public string Format { get; set; }
[MessageBodyMember]
public System.IO.Stream Data { get; set; }
}
Сокращенная версия транспортного агента с соответствующими битами (надеюсь):
public class Agent : RoutingAgent
{
public delegate void PostingDelegate(MailItem item);
IReceiveService service;
public Agent()
{
string tcpServiceUri = "net.tcp://localhost:1530";
NetTcpBinding endpointBinding = new NetTcpBinding();
endpointBinding.TransferMode = TransferMode.Streamed;
ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(IReceiveService)),
endpointBinding,
new EndpointAddress(tcpServiceUri));
ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
service = factory.CreateChannel();
this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
}
void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
{
if (TheseAreTheDroidsImLookingFor(e))
{
PostingDelegate del = PostData;
del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
}
}
void PostData(MailItem item)
{
// Body class is basically direct implementation of IMessageBody
// with a constructor to set up the public properties from MailItem.
var body = new Body(item);
service.ImportMessageBody(body);
}
void CompletePost(IAsyncResult ar)
{
var context = ar.AsyncState as AgentAsyncContext;
context.Complete();
}
}
Наконец, реализация сервиса размещается так:
string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
try
{
host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
host.AddDefaultEndpoints();
var endpoint = host.Description.Endpoints.First();
((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;
trace.Log(Level.Debug,String.Format("Created service host: {0}", host));
host.Open();
trace.Log(Level.Debug,"Opened service host.");
}
catch (Exception e)
{
string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
trace.Log(Level.Debug,message);
trace.Log(Level.Error, message);
}