Как заставить сокет UDP заменять старые сообщения (еще не recv () и d) при поступлении новых? - PullRequest
6 голосов
/ 11 августа 2010

Сначала немного контекста, объясняющего, почему я нахожусь на маршруте "UDP-выборка":
Я хотел бы попробовать данные, полученные с высокой скоростью в течение неизвестного периода времени. Данные, которые я хочу получить, находятся на другом компьютере, а не на том, который потребляет данные. У меня есть выделенное соединение Ethernet между ними, поэтому пропускная способность не является проблемой. У меня проблема в том, что машина, потребляющая данные, работает намного медленнее, чем та, которая их производит. Дополнительным ограничением является то, что, хотя все в порядке, что я не получаю все образцы (это просто образцы), обязательно я получу последний один.

Мое первое решение состояло в том, чтобы производитель данных отправлял дейтаграмму UDP для каждого полученного сэмпла и позволил потребителю данных попытаться получить сэмплы, которые он мог, и позволить другим отбрасываться уровнем сокетов, когда сокет UDP заполнен. Проблема с этим решением заключается в том, что при поступлении новых дейтаграмм UDP и заполнении сокета отбрасываются новые дейтаграммы, а не старые . Поэтому я не гарантирован, чтобы иметь последний!

У меня вопрос: есть ли способ заставить сокет UDP заменять старые дейтаграммы при поступлении новых?

Приемник в настоящее время является машиной Linux, но это может измениться в пользу другой Unix-подобной ОС в будущем (Windows может быть возможной, поскольку она реализует сокеты BSD, но менее вероятно)
Идеальное решение будет использовать широко распространенные механизмы (например, setsockopt ()) для работы.

PS: я думал о других решениях, но они более сложны (включают в себя серьезную модификацию отправителя), поэтому я хотел бы сначала иметь определенный статус относительно осуществимости того, что я прошу! :)

Обновление: - Я знаю, что ОС на принимающей машине может обрабатывать сетевую нагрузку + повторную сборку трафика, генерируемого отправителем. Просто его поведение по умолчанию - отбрасывать новые дейтаграммы, когда буфер сокета заполнен. И из-за времени обработки в процессе получения, я знаю, что он будет заполнен, что бы я ни делал (напрасно тратить половину памяти в буфере сокета:)).
- Я действительно хотел бы избежать того, чтобы вспомогательный процесс делал то, что могла бы сделать ОС во время диспетчеризации пакетов, и тратил бы впустую ресурсы, просто копируя сообщения в SHM.
- Проблема, с которой я сталкиваюсь при изменении отправителя, состоит в том, что код, к которому у меня есть доступ, является просто функцией PleaseSendThisData (), она не знает, что это может быть последний раз, когда она вызывается раньше, поэтому я не увидеть любые выполнимые трюки на этом конце ... но я открыт для предложений! :)

Если на самом деле нет способа изменить поведение приема UDP в BSD-сокете, тогда хорошо ... просто скажите, я готов принять эту ужасную правду и начну работать над решением "вспомогательного процесса", когда я вернитесь к нему:)

Ответы [ 6 ]

5 голосов
/ 11 августа 2010

Просто установите сокет в неблокирующее состояние и включите recv(), пока он не вернет <0 с <code>errno == EAGAIN. Затем обработайте последний полученный пакет, промойте и повторите.

4 голосов
/ 11 августа 2010

Я согласен с "кафе". Установите сокет в неблокирующий режим.

Всякий раз, когда вы получаете что-то в сокете - читайте в цикле, пока ничего не останется Затем обработайте последнюю прочитанную дейтаграмму.

Только одно примечание: вы должны установить большой системный приемный буфер для сокета

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));
2 голосов
/ 11 августа 2010

Это будет трудно получить полностью правильно только на стороне слушателя, поскольку он может фактически пропустить последний пакет в чипе сетевого интерфейса, что помешает вашей программе когда-либо иметь шанс увидеть его.

UDP-код операционной системы был бы лучшим местом, чтобы попытаться справиться с этим, поскольку он будет получать новые пакеты, даже если он решит их отбросить, поскольку у него уже слишком много очередей. Тогда он мог бы принять решение об отказе от старого или отбрасывании нового, но я не знаю, как сказать ему, что это именно то, что вы хотели бы сделать.

Вы можете попытаться справиться с этим на приемнике, имея одну программу или поток, который всегда пытается прочитать в самом новом пакете, и другой, который всегда пытается получить этот самый новый пакет. Как это сделать, будет зависеть от того, сделаете ли вы это как две отдельные программы или как два потока.

В качестве потоков вам понадобится мьютекс (семафор или что-то в этом роде) для защиты указателя (или ссылки) на структуру, используемую для хранения 1 полезной нагрузки UDP и всего, что вам нужно (размер, IP-адрес отправителя, порт отправителя, отметка времени и т. д.).

Поток, который фактически читает пакеты из сокета, будет хранить данные пакета в структуре, получит мьютекс, защищающий этот указатель, заменит текущий указатель на указатель на только что созданную структуру, освободит мьютекс, сообщит процессору Поток, которому нужно что-то сделать, а затем очистить структуру, на которую он только что получил указатель, и использовать ее для хранения следующего входящего пакета.

Поток, который фактически обрабатывал полезную нагрузку пакета, должен ожидать сигнала от другого потока и / или периодически (возможно, для этого вам понадобится 500 мс или около того, и вы решите) и получить мьютекс, поменять местами его указатель на структура полезной нагрузки UDP с той, которая существует, освобождает мьютекс, и затем, если структура имеет какие-либо пакетные данные, она должна обработать ее и затем ждать следующего сигнала. Если у него не было никаких данных, он должен просто идти вперед и ждать следующего сигнала.

Поток процессора, вероятно, должен работать с более низким приоритетом, чем слушатель UDP, так что слушатель с меньшей вероятностью когда-либо пропустит пакет. При обработке последнего пакета (того, который вас действительно волнует) процессор не будет прерван, потому что нет новых пакетов, которые слушатель мог бы услышать.

Вы можете расширить это, используя очередь, а не просто один указатель в качестве места обмена для двух потоков. Единственный указатель - это просто очередь длиной 1, которую очень легко обрабатывать.

Вы также можете расширить это, попытавшись заставить поток слушателя определить, есть ли несколько ожидающих пакетов, и только фактически поместить последние из них в очередь для потока процессора. То, как вы это сделаете, будет зависеть от платформы, но если вы используете * nix, это должно вернуть 0 для сокетов, ничего не ожидая:

while (keep_doing_this()) {
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom
    if (len < 0) {
        error();
    }
    int sz;
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
    if (rc < 0) {
        error();
    }
    if (!sz) {
        // There aren't any more packets ready, so queue up the one we got
        my_udp_packet->current_len = len;

        my_udp_packet = swap_udp_packet(my_ucp_packet);
        /* swap_udp_packet is code you would have to write to implement what I talked
           about above. */

        tgkill(this_group, procesor_thread_tid, SIGUSR1);
    } else if (sz > my_udp_packet->buf_len) {
        /* You could resize the buffer for the packet payload here if it is too small.*/
    }
}

udp_packet должен быть выделен для каждого потока, а также 1 для указателя замены. Если вы используете очередь для обмена, то у вас должно быть достаточно udp_packets для каждой позиции в очереди - поскольку указатель - это просто очередь длиной 1, для которой нужно только 1.

Если вы используете систему POSIX, подумайте о том, чтобы не использовать сигнал реального времени для сигнализации, потому что они стоят в очереди. Использование обычного сигнала позволит вам обрабатывать сигналы, передаваемые много раз, так же, как сигналы только один раз, пока сигнал не будет обработан, а сигналы в режиме реального времени выстраиваются в очередь. Периодическое пробуждение для проверки очереди также позволяет обрабатывать возможность получения последнего сигнала сразу после того, как вы проверили, есть ли у вас какие-либо новые пакеты, но перед вызовом pause для ожидания сигнала.

1 голос
/ 11 августа 2010

Я почти уверен, что это доказуемо неразрешимая проблема, тесно связанная с проблемой двух армий .

Я могу придумать грязное решение: установить TCP-соединение «control» с боковой полосой, которое несет последний пакет, который также является индикацией «end end».В противном случае вам нужно использовать одно из более общих прагматических средств, отмеченных в Технические подходы .

1 голос
/ 11 августа 2010

Другая идея состоит в том, чтобы иметь специальный процесс чтения, который делает только циклы в сокете и считывает входящие пакеты в кольцевой буфер в разделяемой памяти (вам придется беспокоиться о правильном порядке записи). Что-то вроде kfifo. Неблокирование здесь тоже хорошо. Новые данные переопределяют старые данные. Тогда другой процесс (ы) всегда будет иметь доступ к последнему блоку в начале очереди и всех предыдущих чанков, которые еще не перезаписаны.

Может быть слишком сложно для простого одностороннего считывателя, просто опция.

0 голосов
/ 12 января 2019

Это старый вопрос, но вы хотите превратить очередь сокетов (FIFO) в стек (LIFO).Это невозможно, если только вы не хотите возиться с ядром.

Вам нужно переместить дейтаграммы из пространства ядра в пространство пользователя и затем обработать.Самый простой подход - это цикл вроде этого ...

  1. Блокировать до тех пор, пока в сокете не появятся данные (см. Select, poll, epoll)
  2. Слить сокет, сохраняя датаграммы в соответствии с вашими данными.собственная политика выбора
  3. Обработка сохраненных дейтаграмм
  4. Повтор
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...