Проблема, с которой вы сталкиваетесь, заключается в том, что вы используете один и тот же экземпляр буфера между повторными вызовами вашей наблюдаемой. Вы должны убедиться, что у вас есть новый экземпляр буфера каждый раз. Также кажется, что вы должны обрезать буфер на основе целого числа возврата. Вот то, что я предлагаю включить в ваше заявление if
:
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(socket.BeginReceive, socket.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[n];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[400];
return
from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
select copy(buffer, n);
})
.Repeat()
.Subscribe(x => OnMessageReceived(x));
РЕДАКТИРОВАТЬ: В ответ на комментарий повторное присоединение частей сообщения. Также исправлено решение выше - должно быть 0
, а не n
в функции CopyTo
в copy
лямбда.
Я не эксперт по использованию сокетов, но, поскольку сокет - это (и исправьте меня, если я ошибаюсь) всего лишь поток байтов, вам нужно будет определить, когда каждое сообщение в потоке полный.
Вот один из возможных способов сделать это:
/* include `functionReceiveSocketData` & `copy` from above */ =
Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
var rs = new byte[bs1.Length + bs2.Length];
bs1.CopyTo(rs, 0);
bs2.CopyTo(rs, bs1.Length);
return rs;
};
var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));
Observable
.Defer(() => /* as above */)
.Repeat()
.Scan(new byte[] { }, (abs, bs) =>
{
var current = append(abs, bs);
if (isCompleteMessage(current))
{
messages.OnNext(current);
current = new byte[] { };
}
return current;
})
.Subscribe();
Единственное, что вам нужно сделать, это выяснить, как реализовать функцию isCompleteMessage
.