быстрый параллельный TcpClients с await BeginConnect новый поток импульсов - PullRequest
1 голос
/ 12 ноября 2011

Я пытаюсь установить TCP-соединение с несколькими параллельными IP-адресами и сделать это как можно быстрее.Я преобразовал некоторый старый код для использования AsyncCTP для этой цели, вводя параллелизм.

Изменения в дизайне и скорости, а также доступ к успешным соединениям?

У меня три вопроса:

  • Насколько плох следующий поток / что я должен изменить?
    • то есть ожидание запускает несколько параллельных потоков TcpRequest,
    • , но внутри каждого TcpRequest есть tcpClient.BeginConnect
    • , а также другой поток, вызываемый для чтения (еслисоединение установлено)
    • и запись в поток выполняется с помощью механизма Wait / Pulse в цикле while.
  • Во-вторых, как я могу ускорить процесс подключения к ряду целей?
    • В настоящее время, если цели ip: port на самом деле не работают на каких-либо серверах, я получаю «All Done», напечатанный через 18 секунд после начала, когда я пытаюсь подключиться к 500 локальным целям (которыене прослушивает и, следовательно, не работает на этих портах).
  • Как получить доступ к методу успешных подключений WriteToQueue с mothership ?

Async Mothership пытается подключиться ко всем целевым объектам параллельно

// First get a bunch of IPAddress:Port targets
var endpoints = EndPointer.Get(); 
// Try connect to all those targets
var tasks = from t in topList select TcpRequester.ConnectAsync(t);
await TaskEx.WhenAll(tasks);
Debug.WriteLine("All Done");

Статический метод доступа для отдельных задач TcpRequest

    public static Task<TcpRequester> ConnectAsync(IPEndPoint endPoint)
    {
        var tcpRequester = Task<TcpRequester>.Factory.StartNew(() =>
                                             {
                                             var request = new TcpRequester();
                                             request.Connect(endPoint); 
                                             return request;
                                             }
                                             );
        return tcpRequester;
    }

TcpRequester с BeginConnect TimeOut и новым потоком для чтения

    public void Connect(IPEndPoint endPoint)
    {
           TcpClient tcpClient = null;
           Stream stream = null;
           using (tcpClient = new TcpClient())
            {
                tcpClient.ReceiveTimeout = 1000;
                tcpClient.SendTimeout = 1000;
                IAsyncResult ar = tcpClient.BeginConnect(endPoint.Address, endPoint.Port, null, null);
                WaitHandle wh;
                wh = ar.AsyncWaitHandle;
                try
                {
                    if (!ar.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), false))
                    {
                        throw new TimeoutException();
                    }

                    if (tcpClient.Client != null)
                    {
                        // Success
                        tcpClient.EndConnect(ar);
                    }

                    if (tcpClient.Connected)
                    {
                        stream = tcpClient.GetStream();
                    }

                    // Start to read stream until told to close or remote close
                    ThreadStart reader = () => Read(stream);
                    // Reading is done in a separate thread
                    var thread = new Thread(reader);
                    thread.Start();

                    // See Writer method below
                    Writer(stream);

                } finally
                {
                    wh.Close();
                }
            }
        } catch (Exception ex)
        {
            if (tcpClient != null)
                tcpClient.Close();
        }
    }
    }

Запись в поток с ожиданием и импульсом

    readonly Object _writeLock = new Object();

    public void WriteToQueue(String message)
    {
            _bytesToBeWritten.Add(Convert(message));

            lock (_writeLock)
            {
                 Monitor.Pulse(_writeLock);
            }
    }

    void Writer(Stream stream)
    {
        while (!_halt)
        {
            while (_bytesToBeWritten.Count > 0 && !_halt)
            {
                // Write method does the actual writing to the stream:
                if (Write(stream, _bytesToBeWritten.ElementAt(0)))
                {
                    _bytesToBeWritten.RemoveAt(0);
                } else
                {
                    Discontinue();
                }
            }
            if (!(_bytesToBeWritten.Count > 0) && !_halt)
            {
                lock (_writeLock)
                {
                    Monitor.Wait(_writeLock);
                }
            }
        }
        Debug.WriteLine("Discontinuing Writer and TcpRequester");
    }

1 Ответ

2 голосов
/ 15 ноября 2011

Есть несколько красных флажков, которые появляются при беглом взгляде.

  • У вас есть Stream, который принимает чтение и запись, но нет четкого указания на то, что операции были синхронизированы надлежащим образом. В документации говорится, что методы экземпляра Stream не безопасны для многопоточных операций.
  • Похоже, что синхронизация вокруг операций с _bytesToBeWritten.
  • Получение lock исключительно для выполнения Monitor.Wait и Monitor.Pulse немного странно, если не совсем неправильно. Это в основном эквивалентно использованию ManualResetEvent.
  • Практически никогда не правильно использовать Monitor.Wait без while цикла. Чтобы понять, почему вы должны понимать цель пульсации и ожидания на lock. Это действительно выходит за рамки этого ответа.
  • Похоже, что методы Writer и WriteToQueue являются попыткой создать очередь производитель-потребитель. .NET BCL уже содержит внутреннюю информацию для этого через класс BlockingCollection.

Что бы это ни стоило, я не вижу ничего ужасного в общем подходе и использовании ключевого слова await.

...