Как подключиться к сокету TCP, отправить данные для авторизации перед получением данных - PullRequest
1 голос
/ 11 марта 2012

Я использую Rx для подключения к сокету и получения данных.У меня проблема в том, что после подключения к сокету мне необходимо отправить данные для авторизации, прежде чем данные могут быть получены.

Соединение

public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port)
{
    return Observable.FromAsyncPattern<IPAddress, int>(
        socket.BeginConnect,
        socket.EndConnect)(address, port);
}

ПолучениеДанные

Не будет публиковать весь код, но в итоге он использует TakeWhile для многократного получения байтов от асинхронных методов Начало / Конец приема.

var receiveData = Observable.FromAsyncPattern
    <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

Вопрос 1:

Теперь моя проблема в том, как мне построить свою подписку?

var query = from _ in socket.WhenConnected(IPAddress.Parse(_host), _port)
            //need to authorize before receiving data
            from value socket.DataReceived().Repeat()
            select value;

using (query.Subscribe(...

Вопрос 2

Обычно я отправляюпакеты информации, используя NetworkStream, а не Socket, так что мне нужно сделать что-то совершенно другое?

1 Ответ

0 голосов
/ 12 марта 2012

Библиотека Rxx имеет несколько методов расширения, которые очень полезны для такого рода операций. Внутри StreamExtensions.cs версии 1.3 находится

public static IObservable<byte[]> ReadObservable(this Stream stream, int count)

и

public static IObservable<Unit> WriteObservable(this Stream stream, byte[] buffer, int offset, int count)

, который можно использовать в сетевом потоке. Я не уверен, что следующее является лучшим способом, но его можно настроить следующим образом:

NetworkStream networkStream = null;
var setupStreamPlan = Observable
  .When(WhenConnected(...))
  .Then(_ => networkStream = new NetworkStream(socket));
var authorizePlan = Observable
  .When(setupStreamPlan)
  .Then(_ => networkStream.WriteObserable(/*message to write*/);
var listenPlan = Observable
  .When(authorizePlan)
  .Then(_ => networkStream.ReadObservable(10).Repeat());
var constantSizeMessageStream = Observable
  .When(listenPlan)
  .Switch();

EDIT: разрешено настраивать networkStream

...