Как передать Json на постоянно открытый сокет с Json.Net? - PullRequest
0 голосов
/ 06 мая 2019

Я пытаюсь передать Json между клиентом и сервером через постоянно открытый сокет. И я столкнулся с несколькими проблемами:

Вначале я столкнулся с такой проблемой, что при передаче нескольких данных последний Json не обрабатывался и ожидал дополнительных данных. Когда после этого я передал больше данных, непрочитанный Json был обработан как следующие.

2019/05/06 12:34:44.495|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:34:44.603|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:34:54.787|INFO|Test.Server.Tcp.Program|100 [...json dump...]
2019/05/06 12:34:54.823|INFO|Test.Server.Tcp.Program|101 [...json dump...]

В этом случае сначала я отправил Json 100 раз, а через 10 секунд еще 5 раз. Журналы показывают, что 99 были обработаны в первую очередь, а через 10 секунд остальные.

Насколько я понимаю, это как-то связано с тем, что метод Read () из открытого потока вешает поток, пока не будут получены следующие данные. Я попытался использовать свойство ReceiveTimeout, игнорируя ошибку SocketError.TimedOut. В результате при отправке Json 100 раз на последнем после ожидания тайм-аута выдается ошибка десериализации.

2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|98 [...json dump...]
2019/05/06 12:58:38.743|INFO|Test.Server.Tcp.Program|99 [...json dump...]
2019/05/06 12:58:39.528|FATAL|Test.Server.Tcp.Program|Newtonsoft.Json.JsonSerializationException: Unexpected token while deserializing object: PropertyName. Path 'Data.UserId', line 898, position 13.
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateValueInternal(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonSerializer.Deserialize[T](JsonReader reader)
   at TcpJsonCarrier`1.<ReceiveRun>b__13_0() in D:\Projects\...path...\TcpJsonCarrier.cs:line 71

Класс для работы с потоком:

public class TcpJsonCarrier<T> : IDisposable {
        private readonly ILogger _logger;
        private readonly CancellationToken _cancellationToken;

        private readonly ConcurrentQueue<T> _incomingData;
        private readonly ConcurrentQueue<T> _outgoingData;

        private readonly StreamReader _streamReader;
        private readonly StreamWriter _streamWriter;

        private readonly JsonTextReader _jsonTextReader;
        private readonly JsonTextWriter _jsonTextWriter;

        private readonly JsonSerializer _jsonSerializer;


        public TcpJsonCarrier(ILogger logger, TcpClient tcpClient) :
            this(logger, tcpClient, CancellationToken.None) { }

        public TcpJsonCarrier(ILogger logger, TcpClient tcpClient, CancellationToken cancellationToken) {
            _logger = logger;
//            tcpClient.ReceiveTimeout = 1000;
            _cancellationToken = cancellationToken;

            _incomingData = new ConcurrentQueue<T>();
            _outgoingData = new ConcurrentQueue<T>();

            _streamReader = new StreamReader(tcpClient.GetStream(), Encoding.UTF8, true, 1024, true);
            _streamWriter = new StreamWriter(tcpClient.GetStream(), Encoding.UTF8, 1024, true) {AutoFlush = true};

            _jsonTextReader = new JsonTextReader(_streamReader) {SupportMultipleContent = true};
            _jsonTextWriter = new JsonTextWriter(_streamWriter);

            _jsonSerializer = new JsonSerializer {
                Formatting = Formatting.Indented,
                TypeNameHandling = TypeNameHandling.Auto
            };

            ReceiveRun();
            SendRun();
        }

        public IEnumerable<T> Read() {
            while (_incomingData.TryDequeue(out var data)) {
                yield return data;
            }
        }

        public void Write(T data) {
            _outgoingData.Enqueue(data);
        }

        private Task ReceiveRun() {
            return Task.Factory.StartNew(async () => {
                while (true) {
                    await Task.Delay(1, _cancellationToken);

                    try {
                        while (_jsonTextReader.Read())
                            _incomingData.Enqueue(_jsonSerializer.Deserialize<T>(_jsonTextReader));
                    }
                    catch (IOException ioe) when (ioe.InnerException is SocketException se &&
                                                  se.SocketErrorCode == SocketError.TimedOut) { }
                    catch (Exception e) {
                        _logger.LogCritical(e.ToString());
                        throw;
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }

        private Task SendRun() {
            return Task.Factory.StartNew(async () => {
                while (true) {
                    try {
                        await Task.Delay(1, _cancellationToken);

                        _cancellationToken.ThrowIfCancellationRequested();

                        while (_outgoingData.TryDequeue(out var data)) {
                            _jsonSerializer.Serialize(_jsonTextWriter, data);
                        }
                    }
                    catch (Exception e) {
                        _logger.LogCritical(e.ToString());
                        throw;
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }

        public void Dispose() {
            _streamReader?.Dispose();
            _streamWriter?.Dispose();
            ((IDisposable) _jsonTextReader)?.Dispose();
            ((IDisposable) _jsonTextWriter)?.Dispose();
        }
    }

Сервер:

class Program {
        static void Main(string[] args) {
            var loggerFactory = new LoggerFactory().AddNLog();
            var logger = loggerFactory.CreateLogger<Program>();

            var tcpListener = new TcpListener(IPAddress.Any, 9100);
            tcpListener.Start();

            var sem = new SemaphoreSlim(1, 1);
            while (true) {
                sem.Wait();

                tcpListener
                    .AcceptTcpClientAsync()
                    .ContinueWith(async task => {
                        var tcpClient = await task;

                        logger.LogInformation("Client accepted");

                        sem.Release();

                        var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, tcpClient);

                        var count = 0;
                        while (true) {
                            await Task.Delay(1);

                            foreach (var message in tcpJsonCarrier.Read()) {
                                logger.LogInformation($"{++count} {message}");
                            }
                        }
                    }, TaskContinuationOptions.LongRunning);
            }
        }
    }

Клиент:

class Program {
        static void Main(string[] args) {
            var loggerFactory = new LoggerFactory().AddNLog();
            var logger = loggerFactory.CreateLogger<Program>();

            var client = new TcpClient();
            client
                .ConnectAsync("...remoteIp...", 9100)
                .ContinueWith(async task => {
                    await task;

                    logger.LogInformation("Connected");

                    var tcpJsonCarrier = new TcpJsonCarrier<Message>(logger, client);

                    for (int i = 0; i < 100; i++) {
                        await Task.Delay(100);

                        tcpJsonCarrier.Write(new Message(...messageData...));
                    }

                    await Task.Delay(10000);

                    for (int i = 0; i < 5; i++) {
                        await Task.Delay(100);

                        tcpJsonCarrier.Write(new Message(...messageData...));
                    }
                }, TaskContinuationOptions.LongRunning);

            while (true) { }
        }
    }

В данный момент я нахожусь в тупике.

...