Как написать из нескольких потоков в TcpListener? - PullRequest
0 голосов
/ 15 февраля 2019

Допустим, у меня есть статический список List<string> dataQueue, где данные продолжают добавляться через случайные интервалы, а также с переменной скоростью (1-1000 записей в секунду) .

MyОсновная цель - отправить данные из списка на сервер, я использую класс TcpClient.

На данный момент я синхронно отправляю данные клиенту вОдин поток

byte[] bytes = Encoding.ASCII.GetBytes(message);

tcpClient.GetStream().Write(bytes, 0, bytes.Length);
//The client is already connected at the start

И я удаляю запись из списка, как только данные отправляются.

Это работает нормально, но скорость отправки данных недостаточно высока,список заполняется и занимает больше памяти, так как список повторяется и отправляется один за другим.

Мой вопрос: могу ли я использовать один и тот же объект tcpClient для одновременной записи из другого потока или могу использовать другой tcpClient объект с новым подключением к тому же серверу в другом потоке?Какой самый эффективный (самый быстрый) способ отправить эти данные на сервер?

PS: я не хочу использовать UDP

1 Ответ

0 голосов
/ 15 февраля 2019

справа;это забавная тема, о которой, я думаю, я могу высказаться.Похоже, вы разделяете один сокет между несколькими потоками - совершенно верно , пока вы делаете это очень осторожно.Сокет TCP представляет собой логический поток байтов, поэтому вы не можете использовать его одновременно как таковой, но если ваш код достаточно быстр, вы можете очень эффективно использовать сокет, причем каждое сообщение .

Вероятно, самая первая вещь, на которую стоит обратить внимание: как вы на самом деле записываете данные в сокет?каков ваш код кадрирования / кодирования?Если этот код просто плохой / неэффективный: его, вероятно, можно улучшить.Например, это косвенно создает новый byte[] за string через наивный Encode вызов?Есть несколько задействованных буферов?Это звонит Send несколько раз во время кадрирования?Как это подходит к проблеме фрагментации пакетов?etc

В качестве самой первой вещи, которую можно попробовать - вы можете избежать некоторых выделений буфера:

var enc = Encoding.ASCII;
byte[] bytes = ArrayPool<byte>.Shared.Rent(enc.GetMaxByteCount(message.Length));
// note: leased buffers can be oversized; and in general, GetMaxByteCount will
// also be oversized; so it is *very* important to track how many bytes you've used
int byteCount = enc.GetBytes(message, 0, message.Length, bytes, 0);
tcpClient.GetStream().Write(bytes, 0, byteCount);
ArrayPool<byte>.Shared.Return(bytes);

При этом используется арендованный буфер, чтобы избежать создания byte[] каждоговремя, которое может значительно улучшить влияние ГХ.Если бы это был я, я бы также использовал необработанные Socket, а не TcpClient и Stream абстракции, которые, откровенно говоря, не приносят вам много пользы.Примечание: если вам нужно выполнить другое кадрирование: включите его в размер буфера, который вы арендуете, используйте соответствующие смещения при записи каждого фрагмента и пишите только один раз - т.е. подготовьте весь буфер один раз - избегайте многократных вызововна Send.


Прямо сейчас, похоже, у вас есть очередь и выделенный автор;т. е. ваш код app добавляется в очередь, а ваш код writer снимает с очереди и записывает их в сокет.Это разумный способ реализации вещей, хотя я бы добавил некоторые примечания:

  • List<T> - это ужасный способ реализации очереди - удаление вещей с самого начала требуетперестановка всего остального (что дорого);если возможно, предпочтите Queue<T>, который идеально подходит для вашего сценария
  • , это потребует синхронизации, то есть вам нужно убедиться, что только один поток изменяет очередь за раз - это обычно делается с помощью простого lock, то есть lock(queue) {queue.Enqueue(newItem);} и SomeItem next; lock(queue) { next = queue.Count == 0 ? null : queue.Dequeue(); } if (next != null) {...write it...}.

Этот подход простой и имеет некоторые преимущества с точки зрения предотвращения фрагментации пакета - средство записи может использовать промежуточный буфери только на самом деле записывает в сокет, когда определенный порог буферизован или когда очередь пуста, например - но у него есть возможность создать огромное отставание при возникновении остановок.

Однако!Тот факт, что произошло отставание, указывает на то, что что-то не идет в ногу;это может быть сеть (пропускная способность), удаленный сервер (ЦП) или, возможно, оборудование локальной исходящей сети.Если это происходит только небольшими всплесками, которые затем разрешаются сами собой - хорошо (особенно, если это происходит, когда некоторые исходящие сообщения огромны), но: один для просмотра.

Если этот вид невыполнения повторяется, тоОткровенно говоря, вы должны учитывать, что вы просто насыщены текущим дизайном, поэтому вам нужно разблокировать одну из точек зажима:

  • , чтобы убедиться, что ваш код кодирования эффективен, это нулевой шаг
  • вы можете переместить шаг кодирования в код приложения, то есть подготовить кадр перед тем, как захватит блокировку, закодировать сообщение и поставить в очередь только полностью подготовленный кадр;это означает, что поток записи не должен делать ничего, кроме удаления из очереди, записи, перезапуска - но это делает управление буфером более сложным (очевидно, вы не можете перерабатывать буферы, пока они не будут полностью обработаны)
  • сокращениефрагментация пакетов может существенно помочь, если вы еще не предприняли шаги для достижения этого
  • , в противном случае вам может потребоваться (после изучения блокировки):
    • улучшенное оборудование локальной сети (NIC) или физическоеаппаратное обеспечение (процессор и т. д.)
    • несколько сокетов (и очередей / рабочих) для циклического распределения между ними, распределение нагрузки
    • возможно, несколько серверов процессов , с портом на сервер, так что ваши несколько сокетов взаимодействуют с разнымипроцессы
    • лучший сервер
    • несколько серверов

Примечание: при любом сценарии с несколькими сокетами вы должны быть осторожны, чтобы несойти с ума и иметь слишком много выделенных рабочих потоков;если это число превысит, скажем, 10 потоков, вы , вероятно, захотите рассмотреть другие варианты - возможно, с использованием асинхронного ввода-вывода и / или конвейеров (ниже).


Для полноты, другой базовый подход заключается в написании из кода приложения ;этот подход еще проще и позволяет избежать отставания от неотправленной работы, но: это означает, что теперь ваши потоки кода приложения сами будут выполнять резервное копирование под нагрузкой.Если ваши потоки кода приложения на самом деле являются рабочими потоками, и они заблокированы по синхронизации / lock, то это может быть действительно плохо;вы не хотите насытить пул потоков, так как вы можете оказаться в сценарии, когда нет потоков, доступных для пула потоков, чтобы выполнить работу ввода-вывода, необходимую для разблокирования того, кто пишетактивный, который может вызвать у вас реальных проблем.Обычно это не та схема, которую вы хотите использовать для высокой нагрузки / объема, поскольку она очень быстро становится проблематичной - и очень сложно избежать фрагментации пакетов, поскольку каждое отдельное сообщение не может знать, собирается ли поступить больше сообщений..


Еще один вариант, который следует рассмотреть в последнее время, - это «конвейеры»;это новая инфраструктура ввода-вывода в .NET, предназначенная для работы в больших объемах, уделяя особое внимание таким вещам, как асинхронный ввод-вывод, повторное использование буфера и хорошо реализованный механизм буфера / обратного журнала, позволяющий использовать простойподход писателя (синхронизировать во время записи) и не переводить его в прямую отправку - он проявляется как асинхронный писатель с доступом к заделу, что делает предотвращение фрагментации пакетов простым и эффективным.Это довольно продвинутая область, но она может быть очень эффективной.Проблемной частью для вас будет: он предназначен для асинхронного использования во всех случаях, даже для записи - поэтому, если ваш код приложения в настоящее время является синхронным, это может быть затруднительно для реализации.Но: это область для рассмотрения.У меня есть несколько постов в блогах, посвященных этой теме, и ряд примеров OSS и реальных библиотек, в которых используются конвейеры, на которые я могу вам указать, но: это не «быстрое решение» - эторадикальный ремонт всего вашего IO слоя.Это также не волшебная палочка - она ​​может только устранить накладные расходы из-за затрат на локальную обработку ввода-вывода.

...