Как получить живые данные из долгоживущего HTTP-соединения в C#? - PullRequest
0 голосов
/ 23 марта 2020

Я хочу обработать результаты долгоживущего HTTP-соединения с сервера, который я интегрирую с , как они происходят . Этот сервер возвращает одну строку JSON (\n с разделителями) для каждого "события" для обработки.

Учитывая экземпляр Stream, назначенный переменной changeStream, которая представляет байты из открытого HTTP-соединения вот пример того, что я делаю:

(request - это экземпляр WebRequest, настроенный для открытия соединения с сервером, с которым я интегрируюсь .)

var response = request.GetResponse();
var changeStream = response.GetResponseStream();

var lineByLine = Observable.Using(
    () => new StreamReader(changeStream),
    (streamReader) => streamReader.ReadLineAsync().ToObservable()
);

lineByLine.Subscribe((string line) =>
{
    System.Console.WriteLine($"JSON! ---------=> {line}");
});

Используя приведенный выше код, я получаю первую строку, отправляемую сервером, но затем ни одну после этого. Ни из первоначального ответа, ни из новых, сгенерированных в режиме реального времени.

  • В целях моего вопроса, предположим, что это соединение будет оставаться открытым неопределенно долго.
  • Как мне go о том, что system.reactive запускает обратный вызов для каждой линии, когда я с ними сталкиваюсь?

Обратите внимание: этот сценарий не подходит для переключения на SignalR

Ответы [ 2 ]

1 голос
/ 25 марта 2020

Даже если это выглядит сложнее, для этого лучше использовать встроенные операторы.

IObservable<string> query =
    Observable
        .FromAsync(() => request.GetResponseAsync())
        .SelectMany(
            r => Observable.Using(
                () => r,
                r2 => Observable.Using(
                    () => r2.GetResponseStream(),
                    rs => Observable.Using(
                        () => new StreamReader(rs),
                        sr =>
                            Observable
                                .Defer(() => Observable.Start(() => sr.ReadLine()))
                                .Repeat()
                                .TakeWhile(w => w != null)))));

Это не проверено, но должно быть близко.

1 голос
/ 23 марта 2020

Ваша попытка будет наблюдать только один ReadLineAsync звонок. Вместо этого вам нужно вернуть каждую строку. Вероятно, что-то вроде этого;

Observable.Create<string>(async o => {
    var response = await request.GetResponseAsync();
    var changeStream = response.GetResponseStream();
    using var streamReader = new StreamReader(changeStream);
    while (true)
    {
        var line = await streamReader.ReadLineAsync();
        if (line == null)
            break;
        o.OnNext(line);
    }
    o.OnCompleted();
});
...