Постоянно читаете из потока? - PullRequest
14 голосов
/ 07 мая 2010

У меня есть объект Stream, который иногда получает некоторые данные, но с непредсказуемыми интервалами. Сообщения, которые появляются в потоке, четко определены и заранее объявляют размер своей полезной нагрузки (размер представляет собой 16-разрядное целое число, содержащееся в первых двух байтах каждого сообщения).

Я хотел бы иметь класс StreamWatcher, который определяет, когда в Stream есть какие-то данные. Как только это произойдет, я бы хотел, чтобы событие было инициировано, чтобы подписанный экземпляр StreamProcessor мог обработать новое сообщение.

Может ли это быть сделано с событиями C # без непосредственного использования Threads? Кажется, это должно быть просто, но я не могу понять, как правильно спроектировать это.

Ответы [ 4 ]

14 голосов
/ 07 мая 2010

Когда вы говорите, что не используете потоки напрямую , я предполагаю, что вы все равно хотите использовать их косвенно через асинхронные вызовы, в противном случае это не очень полезно.

Все, что вам нужно сделать, это обернуть асинхронные методы Stream и сохранить результат в буфере. Во-первых, давайте определим часть события спецификации:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs
{
    public MessageAvailableEventArgs(int messageSize) : base()
    {
        this.MessageSize = messageSize;
    }

    public int MessageSize { get; private set; }
}

Теперь, прочитайте одно 16-битное целое число из потока асинхронно и доложите, когда он будет готов:

public class StreamWatcher
{
    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    }

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    {
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    }

    protected void WatchNext()
    {
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    }

    private void ReadCallback(IAsyncResult ar)
    {
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    }

    public event MessageAvailableEventHandler MessageAvailable;
}

Я думаю, вот и все. Это предполагает, что любой класс, обрабатывающий сообщение, также имеет доступ к Stream и готов читать его, синхронно или асинхронно, в зависимости от размера сообщения в событии. Если вы хотите, чтобы класс наблюдателя действительно прочитал все сообщение, вам нужно добавить еще код для этого.

2 голосов
/ 07 мая 2010

Да, это можно сделать. Используйте неблокирующий метод Stream.BeginRead с AsyncCallback . Обратный вызов вызывается асинхронно, когда данные становятся доступными. В обратном вызове вызовите Stream.EndRead , чтобы получить данные, и снова вызовите Stream.BeginRead , чтобы получить следующий фрагмент данных. Буферизуйте входящие данные в байтовом массиве, который достаточно велик для хранения сообщения. Когда байтовый массив заполнен (может потребоваться несколько обратных вызовов), вызовите событие. Затем прочитайте следующий размер сообщения, создайте новый буфер, повторите, готово.

1 голос
/ 25 ноября 2010

Разве использование Stream.BeginRead () не похоже на использование синхронного метода Stream.Read () в отдельном потоке?

1 голос
/ 07 мая 2010

Обычный подход заключается в использовании шаблона .NET асинхронного программирования , предоставляемого Stream. По сути, вы начинаете читать асинхронно, вызывая Stream.BeginRead, передавая ему буфер byte[] и метод обратного вызова, который будет вызываться, когда данные будут считаны из потока. В методе обратного вызова вы вызываете Stream.EndRead, передавая ему аргумент IAsncResult, который был передан вашему обратному вызову. Возвращаемое значение EndRead говорит вам, сколько байтов было считано в буфер.

После того, как вы получили первые несколько байтов таким образом, вы можете дождаться оставшейся части сообщения (если вы не получили достаточно данных в первый раз), снова вызвав BeginRead. Получив сообщение целиком, вы можете поднять событие.

...