Почему сокет блокируется от получения, когда я сплю в другом потоке? - PullRequest
0 голосов
/ 07 сентября 2018

У меня есть простое приложение для прослушивания сокетов. Он должен иметь возможность получать запросы и давать ответ, а также сам отправлять запросы и получать ответы на них.

Как только мое приложение запустится, оно начнет получать в отдельный поток и отправит ответ. Эта часть отлично работает.

Однако, когда я отправляю запросы через SendRequest() -метод, мне нужно отфильтровать входящие ответы, чтобы правильные ответы переходили на правильные ранее выполненные запросы. Я делаю это (как показано в коде ниже) с классом ResponseHandler, который позволяет мне зарегистрировать запрос и в ответ уведомляет о моем зарегистрированном запросе, как только поступает правильный ответ. Однако размещенный запрос должен истечь через 10 секунд. , поэтому я использовал CountdownEvent, который ждет эти 10 секунд, но выпускает раньше, если ответ пришел раньше.

Проблема: Мой CountdownEvent всегда ждет все 10 секунд, и только после этого поток, в который я получаю сообщения, продолжит работу и, таким образом, получит ответ. Как это возможно, когда я получаю в другом потоке? Я думаю, что моя программа продолжает получать в этом отдельном потоке, даже когда CountdownEvent.Wait() активен.

Примечание. Ожидаемый ответ действительно возвращается сразу же после того, как я разместил запрос, как показано в NetworkTool WireShark. Так что время ожидания неверно.

Редактировать: В простом WPF-приложении, где SendRequest () вызывается с кнопки, он работает. К сожалению, это означает, что проблема в моей большой программе.

Услуги:

public class Service
{
    private readonly ResponseHandler _responseHandler;
    private readonly SyncSocketServer _serverSocket;

    private static readonly int ServerPort = 9090;

    public Service()
    {
        _responseHandler = new ResponseHandler();

        _serverSocket = new SyncSocketServer(ServerPort);
        _serverSocket.StartListening();
        _serverSocket.DataReceived += ServerSocket_DataReceived;
    }

    public void ServerSocket_DataReceived(object sender, string message)
    {
        // Here I left irrelevant code out: Originally, I check here,
        // whether the message is a request or response and so on, and 
        // I only forward the message to the _responseHandler, if it is
        // indeed a response. If it is a request I send an answer.

        string messageId = GetIdFromMessage(message);
        _responseHandler.DataReceived(messageId, message);
    }

    public void SendRequest(string message)
    {
        string messageId = Guid.NewGuid().ToString();
        string request = CreateRequest(messageId, message);

        _responseHandler.Register(messageId);
        _serverSocket.Send(request);
        string response = _responseHandler.WaitForResponse(messageId);

        Debug.WriteLine("I got the correct response: " + response);
    }
}

SyncSocketServer:

public class SyncSocketServer
{
    public event EventHandler<string> DataReceived;

    private const int BufferSize = 1024;
    private const string EndDelimiter = "\n";

    private Socket _listenerSocket;
    private Socket _client;
    private string _data;
    private Byte[] _buffer;

    private readonly int _port;

    public SyncSocketServer(int port)
    {
        _port = port;
        _buffer = new Byte[BufferSize];
    }

    public void StartListening()
    {
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[3];
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port);

        _listenerSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

        _listenerSocket.Bind(localEndPoint);
        _listenerSocket.Listen(5);

        _client = _listenerSocket.Accept();
        Debug.WriteLine("Local socket opened on: {0}", _listenerSocket.LocalEndPoint);

        StartReceiving();
    }

    private void StartReceiving()
    {
        Thread d = new Thread(() => {
            Thread.CurrentThread.IsBackground = true;
            while (true)
            {
                _data = null;

                while (true)
                {
                    int bytesReceived = _client.Receive(_buffer);
                    _data += Encoding.ASCII.GetString(_buffer, 0, bytesReceived);

                    if (_data.IndexOf(EndDelimiter, StringComparison.OrdinalIgnoreCase) > -1)
                        break;
                }

                Debug.WriteLine("Message received:" + _data);
                OnDataReceived(_data);
            }
        });
        d.Start();
    }

    public void Send(string message)
    {
        byte[] bytesMessage = Encoding.ASCII.GetBytes(message + EndDelimiter);
        _client.Send(bytesMessage);
        Debug.WriteLine("Message sent: " + message);
    }

    protected virtual void OnDataReceived(string data)
    {
        EventHandler<string> handler = DataReceived;

        if (handler != null)
            handler(this, data);
    }
}

ResponseHandler:

public class ResponseHandler
{
    private const int WaitForResponseTimeout = 10000;

    private readonly Dictionary<string, PendingRequest> _pendingRequests;

    public ResponseHandler()
    {
        _pendingRequests = new Dictionary<string, PendingRequest>();
    }

    public void DataReceived(string messageId, string response)
    {
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            Debug.WriteLine("Received response for request, that has been removed");
        else
        {
            pendingRequest.ResponseReceived(response);
            _pendingRequests.Remove(messageId);
        }
    }

    public void Register(string messageId)
    {
        _pendingRequests.Add(messageId, new PendingRequest());
    }

    public string WaitForResponse(string messageId)
    {
        _pendingRequests.TryGetValue(messageId, out var pendingRequest);

        if (pendingRequest == null)
            return null;

        pendingRequest.Await();
        return pendingRequest.Response;
    }

    private class PendingRequest
    {
        public string Response { get; private set; }

        private readonly CountdownEvent _countdownEvent;

        public PendingRequest()
        {
            _countdownEvent = new CountdownEvent(1);
        }

        public void Await()
        {
            // Here, the current thread gets blocked, but
            // I expect, that the thread, where I receive
            // would continue receiving
            _countdownEvent.Wait(WaitForResponseTimeout);
        }

        public void ResponseReceived(stringresponse)
        {
            Response = response;
            _countdownEvent.Signal();
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 10 сентября 2018

Проблема на самом деле является ложным предположением, что запуск события, как я сделал ниже, приведет к пожару и забудет:

protected virtual void OnDataReceived(string data)
{
    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler(this, data);
}

В функции StartReceiving(), где я получаю данные и пересылаю их подписчикам, он приостанавливает вызов, который инициирует событие, и ждет, пока все подписчики завершат свою работу (что включает, конечно, ожидание 10). секунд за ответ). Это приводит к тому, что мой поток-получатель ожидает другого потока.


Решение состоит в том, чтобы выполнить вызов, чтобы он выстрелил и забыл:

protected virtual void OnDataReceived(string data)
{
    EventHandler<string> handler = DataReceived;

    if (handler != null)
        handler.BeginInvoke(this, data, null, null);
}
0 голосов
/ 07 сентября 2018

Итак, ваши классы PendingRequest и ResponseHandler доступны из разных потоков. Итак, для здравомыслия вашей программы вам нужно сделать несколько вещей:

a) Убедитесь, что при добавлении и удалении запросов из словаря ожидающих запросов вы получаете блокировку, поскольку вы одновременно получаете доступ к общей структуре данных из разных потоков. В противном случае вы можете испортить свою структуру данных.

б) Ваша более насущная проблема - метод Await() в PendingRequest. Вы звоните CountdownEvent.Wait() без подтверждения, что ваш ответ уже установлен. Если ваш ответ уже установлен, это будет означать, что вы будете ждать 10 секунд, прежде чем его обработать. Это может произойти, если ваш ответ прибудет, даже до того, как вы вызовете CountdownEvent.Wait(). В этом случае CountdownEvent.Signal() будет просто проигнорировано. Вам следует изменить PendingRequest.Wait() следующим образом:

while (Response is not set) {
      CountdownEvent.Await();
}

Кроме того, не требуется ли вашему семафору CountdownEvent.Wait() мьютекс для передачи ему? Помните, что ваш Response объект распределяется между потоками. Это общая парадигма использования метода wait ():

mutex.lock();
while (Response is not set) {
          CountdownEvent.Await(mutex);
    }

// Do your stuff, since your condition is satisfied
mutext.unlock();
...