Я пытаюсь установить TCP-соединение с несколькими параллельными IP-адресами и сделать это как можно быстрее.Я преобразовал некоторый старый код для использования AsyncCTP для этой цели, вводя параллелизм.
Изменения в дизайне и скорости, а также доступ к успешным соединениям?
У меня три вопроса:
- Насколько плох следующий поток / что я должен изменить?
- то есть ожидание запускает несколько параллельных потоков TcpRequest,
- , но внутри каждого TcpRequest есть tcpClient.BeginConnect
- , а также другой поток, вызываемый для чтения (еслисоединение установлено)
- и запись в поток выполняется с помощью механизма Wait / Pulse в цикле while.
- Во-вторых, как я могу ускорить процесс подключения к ряду целей?
- В настоящее время, если цели ip: port на самом деле не работают на каких-либо серверах, я получаю «All Done», напечатанный через 18 секунд после начала, когда я пытаюсь подключиться к 500 локальным целям (которыене прослушивает и, следовательно, не работает на этих портах).
- Как получить доступ к методу успешных подключений WriteToQueue с mothership ?
Async Mothership пытается подключиться ко всем целевым объектам параллельно
// First get a bunch of IPAddress:Port targets
var endpoints = EndPointer.Get();
// Try connect to all those targets
var tasks = from t in topList select TcpRequester.ConnectAsync(t);
await TaskEx.WhenAll(tasks);
Debug.WriteLine("All Done");
Статический метод доступа для отдельных задач TcpRequest
public static Task<TcpRequester> ConnectAsync(IPEndPoint endPoint)
{
var tcpRequester = Task<TcpRequester>.Factory.StartNew(() =>
{
var request = new TcpRequester();
request.Connect(endPoint);
return request;
}
);
return tcpRequester;
}
TcpRequester с BeginConnect TimeOut и новым потоком для чтения
public void Connect(IPEndPoint endPoint)
{
TcpClient tcpClient = null;
Stream stream = null;
using (tcpClient = new TcpClient())
{
tcpClient.ReceiveTimeout = 1000;
tcpClient.SendTimeout = 1000;
IAsyncResult ar = tcpClient.BeginConnect(endPoint.Address, endPoint.Port, null, null);
WaitHandle wh;
wh = ar.AsyncWaitHandle;
try
{
if (!ar.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), false))
{
throw new TimeoutException();
}
if (tcpClient.Client != null)
{
// Success
tcpClient.EndConnect(ar);
}
if (tcpClient.Connected)
{
stream = tcpClient.GetStream();
}
// Start to read stream until told to close or remote close
ThreadStart reader = () => Read(stream);
// Reading is done in a separate thread
var thread = new Thread(reader);
thread.Start();
// See Writer method below
Writer(stream);
} finally
{
wh.Close();
}
}
} catch (Exception ex)
{
if (tcpClient != null)
tcpClient.Close();
}
}
}
Запись в поток с ожиданием и импульсом
readonly Object _writeLock = new Object();
public void WriteToQueue(String message)
{
_bytesToBeWritten.Add(Convert(message));
lock (_writeLock)
{
Monitor.Pulse(_writeLock);
}
}
void Writer(Stream stream)
{
while (!_halt)
{
while (_bytesToBeWritten.Count > 0 && !_halt)
{
// Write method does the actual writing to the stream:
if (Write(stream, _bytesToBeWritten.ElementAt(0)))
{
_bytesToBeWritten.RemoveAt(0);
} else
{
Discontinue();
}
}
if (!(_bytesToBeWritten.Count > 0) && !_halt)
{
lock (_writeLock)
{
Monitor.Wait(_writeLock);
}
}
}
Debug.WriteLine("Discontinuing Writer and TcpRequester");
}