Как получить буфер принятых сообщений из подключенного клиентского сокета с использованием расширения Socket и Reactive (Rx) - PullRequest
1 голос
/ 27 июля 2011

Поскольку я немного новичок в Rx и изучаю свой путь через него.Я проверил множество примеров, но ни один из них не соответствует моим потребностям.

Сценарий: у меня есть один сокет сервера сокетов (созданный с использованием простого объекта сокета, а не объекта TCPListener).У меня есть несколько клиентов (клиентский сокет, а не TCPClient), подключенных к этому сокету сервера.Я пытаюсь использовать Rx (расширение Reactive) для получения сообщений, отправляемых клиентским сокетом, используя оператор «FromAsyncPattern» для асинхронной операции BeginReceive и EndReceive.

Я почти начал получать сообщения в буфере.Но проблема в том, что буфер возвращает либо то же самое сообщение, полученное от предыдущей операции, либо будет иметь смешанный контент из текущей и предыдущей операции приема.

Вот код, используемый:Метод является просто делегатом, чтобы поднять событие получения сообщения и отправить буфер другому классу для дальнейшей обработки.

Проблема: Тот же самый буфер используется всегда, когда я получаю сообщение, как я могу получитьсообщение получено чистым из предыдущего полученного буфера.

- Что нужно сделать, если я получил сообщение, полученное частично, и следующее сообщение фактически является частью этого же сообщения.

Пожалуйста, помогите мне сВыше проблема с некоторыми фрагментами кода, которые были бы замечательно.Причина публикации этого вопроса заключается в том, что реализация, которую я до сих пор проходил, использует TCPListener, и здесь ограничение заключается в использовании объекта сокета: (

Заранее спасибо.

1 Ответ

2 голосов
/ 28 июля 2011

Проблема, с которой вы сталкиваетесь, заключается в том, что вы используете один и тот же экземпляр буфера между повторными вызовами вашей наблюдаемой. Вы должны убедиться, что у вас есть новый экземпляр буфера каждый раз. Также кажется, что вы должны обрезать буфер на основе целого числа возврата. Вот то, что я предлагаю включить в ваше заявление if:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

РЕДАКТИРОВАТЬ: В ответ на комментарий повторное присоединение частей сообщения. Также исправлено решение выше - должно быть 0, а не n в функции CopyTo в copy лямбда.

Я не эксперт по использованию сокетов, но, поскольку сокет - это (и исправьте меня, если я ошибаюсь) всего лишь поток байтов, вам нужно будет определить, когда каждое сообщение в потоке полный.

Вот один из возможных способов сделать это:

/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

Единственное, что вам нужно сделать, это выяснить, как реализовать функцию isCompleteMessage.

...