У меня есть потоковый сервер, который с контрактом выглядит примерно так:
[ServiceContract]
public interface IStreamingService
{
[OperationContract(Action = "StreamingMessageRequest", ReplyAction = "StreamingMessageReply")]
Message GetStreamingData(Message query);
}
Вот элементарная реализация с удаленными вещами (такими как обработка ошибок), чтобы упростить вещи:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
[ErrorBehavior(typeof(StreamingServiceErrorHandler))]
public class StreamingService : IStreamingService
{
public StreamingService()
{
}
public Message GetStreamingData(Message query)
{
var dataQuery = query.GetBody<DataQuery>();
// Hook up events to let us know if the client disconnects so that we can stop the query...
EventHandler closeAction = (sender, ea) =>
{
dataQuery.Stop();
};
OperationContext.Current.Channel.Faulted += closeAction;
OperationContext.Current.Channel.Closed += closeAction;
Message streamingMessage = Message.CreateMessage(
MessageVersion.Soap12WSAddressing10,
"QueryMessageReply",
new StreamingBodyWriter(QueryMethod(dataQuery));
return streamingMessage;
}
public IEnumerable<object> QueryMethod (DataQuery query)
{
// Returns a potentially infinite stream of objects in response to the query
}
}
Эта реализация использует собственный BodyWriter для потоковой передачи результатов из QueryMethod:
public class StreamingBodyWriter : BodyWriter
{
public StreamingBodyWriter(IEnumerable items)
: base(false) // False should be passed here to avoid buffering the message
{
Items = items;
}
internal IEnumerable Items { get; private set; }
private void SerializeObject(XmlDictionaryWriter writer, object item)
{
// Serialize the object to the stream
}
protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
{
foreach (object item in Items)
{
SerializeObject(writer, item);
}
}
}
Клиент подключается и начинает читать поток данных. Примерно так:
public IEnumerable<T> GetStreamingData<T>(Message queryMessage)
{
Message reply = _Server.GetStreamingData(queryMessage);
// Get a chunckable reader for the streaming reply
XmlReader reader = reply.GetReaderAtBodyContents();
// Read the stream of objects, deserializing each one as appropriate
while (!reader.EOF)
{
object item = DeserializeObject(reader);
if (item == null)
continue;
// Yield each item as it's deserialized, resulting in a [potentially] never-ending stream of objects
yield return (T)item;
}
}
Это работает очень хорошо, и я возвращаю поток объектов клиенту. Очень хорошо. Проблема в том, что клиент отключается в середине потока (грациозно или грациозно). В любом случае сервер не получает уведомления об этом отключении, кроме как по ошибке, обнаруженной обработчиком ошибок службы.
Как видите, я пытался перехватить события Failed и Closed channel в методе service, но они не запускаются, когда клиент отключается.
Я подозреваю, что эти события не срабатывают, потому что с потоковым контрактом служба уже вернулась из метода операции (GetStreamingData). Этот метод возвратил сообщение с пользовательским модулем записи тела, и именно эта запись тела (ниже в стеке служебного канала) извлекает результаты из потока вычислений (через IEnumerable) и передает их в ответное сообщение. Так как метод операции вернулся, то я предполагаю, что эти события канала не срабатывают.
Привязка является настроенной версией привязки net.tcp (недуплексной) только с двумя элементами привязки: BinaryMessageEncodingBindingElement и TcpTransportBindingElement. Нет безопасности или что-нибудь. В основном просто raw net.tcp с двоичными сообщениями.
Проблема в том, что когда клиент отключается, я хочу, чтобы сервер остановил поток фоновых вычислений, который генерирует результаты и передает их в IEnumerable, читаемый BodyWriter. Создатель тела остановился (очевидно), но поток продолжает жить.
Так, где я могу поймать, чтобы обнаружить, отключил ли клиент mid-stream?
Обновление:
Существует два основных случая отключения: явное отключение, вызванное либо удалением клиентского прокси, либо завершением процесса на клиенте; Пассивное отключение, как правило, из-за сбоя в сети (пример: провайдер разрывает соединение, или сетевой кабель дергается).
В первом случае существует явное исключение соединения, которое получает сервер, когда он пытается отправить новые данные по потоку. Нет проблем.
Второй сценарий разрыва сетевого канала более проблематичен. Обычно это условие обнаруживается на основе тайм-аута отправки, но, поскольку это потоковый интерфейс, тайм-аут отправки увеличен (у меня он увеличился до нескольких дней, поскольку вполне возможно, что потоковые передачи данных могут длиться так долго). Поэтому, когда клиент отключен из-за нестабильного сетевого канала, служба продолжает отправлять данные по соединению, которое действительно существует.
На данный момент я не знаю хорошего способа решить второй вариант. Предложения?