Я пытаюсь передать Json между клиентом и сервером через постоянно открытый сокет. И я столкнулся с несколькими проблемами:
Вначале я столкнулся с такой проблемой, что при передаче нескольких данных последний Json не обрабатывался и ожидал дополнительных данных. Когда после этого я передал больше данных, непрочитанный Json был обработан как следующие.
2019/05/06 12:34:44.495|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:34:44.603|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:34:54.787|INFO|Test.Server.Tcp.Program|100 [...json dump...]
2019/05/06 12:34:54.823|INFO|Test.Server.Tcp.Program|101 [...json dump...]
В этом случае сначала я отправил Json 100 раз, а через 10 секунд еще 5 раз.
Журналы показывают, что 99 были обработаны в первую очередь, а через 10 секунд остальные.
Насколько я понимаю, это как-то связано с тем, что метод Read () из открытого потока вешает поток, пока не будут получены следующие данные.
Я попытался использовать свойство ReceiveTimeout, игнорируя ошибку SocketError.TimedOut. В результате при отправке Json 100 раз на последнем после ожидания тайм-аута выдается ошибка десериализации.
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:58:39.528|FATAL|Test.Server.Tcp.Program|Newtonsoft.Json.JsonSerializationException: Unexpected token while deserializing object: PropertyName. Path 'Data.UserId', line 898, position 13.
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
at Newtonsoft.Json.JsonSerializer.Deserialize[T](JsonReader reader)
at TcpJsonCarrier`1.<ReceiveRun>b__13_0() in D:\Projects\...path...\TcpJsonCarrier.cs:line 71
Класс для работы с потоком:
public class TcpJsonCarrier<T> : IDisposable {
private readonly ILogger _logger;
private readonly CancellationToken _cancellationToken;
private readonly ConcurrentQueue<T> _incomingData;
private readonly ConcurrentQueue<T> _outgoingData;
private readonly StreamReader _streamReader;
private readonly StreamWriter _streamWriter;
private readonly JsonTextReader _jsonTextReader;
private readonly JsonTextWriter _jsonTextWriter;
private readonly JsonSerializer _jsonSerializer;
public TcpJsonCarrier(ILogger logger, TcpClient tcpClient) :
this(logger, tcpClient, CancellationToken.None) { }
public TcpJsonCarrier(ILogger logger, TcpClient tcpClient, CancellationToken cancellationToken) {
_logger = logger;
// tcpClient.ReceiveTimeout = 1000;
_cancellationToken = cancellationToken;
_incomingData = new ConcurrentQueue<T>();
_outgoingData = new ConcurrentQueue<T>();
_streamReader = new StreamReader(tcpClient.GetStream(), Encoding.UTF8, true, 1024, true);
_streamWriter = new StreamWriter(tcpClient.GetStream(), Encoding.UTF8, 1024, true) {AutoFlush = true};
_jsonTextReader = new JsonTextReader(_streamReader) {SupportMultipleContent = true};
_jsonTextWriter = new JsonTextWriter(_streamWriter);
_jsonSerializer = new JsonSerializer {
Formatting = Formatting.Indented,
TypeNameHandling = TypeNameHandling.Auto
};
ReceiveRun();
SendRun();
}
public IEnumerable<T> Read() {
while (_incomingData.TryDequeue(out var data)) {
yield return data;
}
}
public void Write(T data) {
_outgoingData.Enqueue(data);
}
private Task ReceiveRun() {
return Task.Factory.StartNew(async () => {
while (true) {
await Task.Delay(1, _cancellationToken);
try {
while (_jsonTextReader.Read())
_incomingData.Enqueue(_jsonSerializer.Deserialize<T>(_jsonTextReader));
}
catch (IOException ioe) when (ioe.InnerException is SocketException se &&
se.SocketErrorCode == SocketError.TimedOut) { }
catch (Exception e) {
_logger.LogCritical(e.ToString());
throw;
}
}
}, TaskCreationOptions.LongRunning);
}
private Task SendRun() {
return Task.Factory.StartNew(async () => {
while (true) {
try {
await Task.Delay(1, _cancellationToken);
_cancellationToken.ThrowIfCancellationRequested();
while (_outgoingData.TryDequeue(out var data)) {
_jsonSerializer.Serialize(_jsonTextWriter, data);
}
}
catch (Exception e) {
_logger.LogCritical(e.ToString());
throw;
}
}
}, TaskCreationOptions.LongRunning);
}
public void Dispose() {
_streamReader?.Dispose();
_streamWriter?.Dispose();
((IDisposable) _jsonTextReader)?.Dispose();
((IDisposable) _jsonTextWriter)?.Dispose();
}
}
Сервер:
class Program {
static void Main(string[] args) {
var loggerFactory = new LoggerFactory().AddNLog();
var logger = loggerFactory.CreateLogger<Program>();
var tcpListener = new TcpListener(IPAddress.Any, 9100);
tcpListener.Start();
var sem = new SemaphoreSlim(1, 1);
while (true) {
sem.Wait();
tcpListener
.AcceptTcpClientAsync()
.ContinueWith(async task => {
var tcpClient = await task;
logger.LogInformation("Client accepted");
sem.Release();
var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, tcpClient);
var count = 0;
while (true) {
await Task.Delay(1);
foreach (var message in tcpJsonCarrier.Read()) {
logger.LogInformation($"{++count} {message}");
}
}
}, TaskContinuationOptions.LongRunning);
}
}
}
Клиент:
class Program {
static void Main(string[] args) {
var loggerFactory = new LoggerFactory().AddNLog();
var logger = loggerFactory.CreateLogger<Program>();
var client = new TcpClient();
client
.ConnectAsync("...remoteIp...", 9100)
.ContinueWith(async task => {
await task;
logger.LogInformation("Connected");
var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, client);
for (int i = 0; i < 100; i++) {
await Task.Delay(100);
tcpJsonCarrier.Write(new Message(...messageData...));
}
await Task.Delay(10000);
for (int i = 0; i < 5; i++) {
await Task.Delay(100);
tcpJsonCarrier.Write(new Message(...messageData...));
}
}, TaskContinuationOptions.LongRunning);
while (true) { }
}
}
В данный момент я нахожусь в тупике.