RxJS: рекурсивный список наблюдаемых и один наблюдатель - PullRequest
8 голосов
/ 28 марта 2012

У меня возникли проблемы с рекурсивной цепочкой наблюдаемых.

Я работаю с RxJS, которая в настоящее время находится в версии 1.0.10621 и содержит основную функциональность Rx в сочетании с Rx для jQuery.

Позвольте мне представить пример сценария для моей проблемы: Я опрашиваю API поиска в Твиттере (ответ JSON) для твитов / обновлений, содержащих определенное ключевое слово. Ответ также включает "refresh_url", который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.

Rx.jQuery позволяет мне заставить API поиска в Твиттере вызывать наблюдаемое событие, которое генерирует одно onNext и затем завершается. До сих пор я пытался, чтобы обработчик onNext запомнил refresh_url и использовал его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая пара + наблюдатель следует за другой бесконечно.

Проблема с этим подходом:

  1. Наблюдаемые наблюдатели / наблюдатели уже живы, когда их предшественники еще не уничтожены.

  2. Я должен сделать много мерзкой бухгалтерии, чтобы сохранить действительную ссылку на ныне живущего наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном цикле). Эта ссылка, конечно, необходима, чтобы отписаться / удалить наблюдателя. Альтернативой бухгалтерии может быть реализация побочного эффекта с помощью «все еще работает?» - логического значения, как я сделал в моем примере.

Пример кода:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

Не обманывайтесь двойным слоем наблюдаемых / наблюдателей от запросов к твитам. Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастически; Но я думаю, что это совсем другое. На данный момент.

Эрик Мейер указал мне на оператор Expand (см. Пример ниже) и предложил Объединить шаблоны в качестве альтернативы.

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                   , i => ( i == 10 ? Observable.Empty<int>() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.

Итак, мой вопрос: как сделать трюк с расширением лучше всего в RxJS?

EDIT:
Оператор раскрытия может быть реализован, как показано в этой теме . Но для этого потребуется генераторов (а у меня только JS <1.6). <Br /> К сожалению, RxJS 2.0.20304-бета не реализует метод расширения.

1 Ответ

4 голосов
/ 24 декабря 2013

Итак, я попытаюсь решить вашу проблему немного иначе, чем вы, и возьму некоторые вольности, которые вы можете решить проще.

Итак, я не могу сказать, что вы пытаетесь выполнить следующие шаги

  • Получить первый список твитов, который содержит следующий URL
  • После получения списка твитов, onNext текущий наблюдатель и получить следующий набор твитов
    • Делать это бесконечно

Или есть действие пользователя (получить больше / прокрутка вниз).В любом случае, это действительно та же проблема.Возможно, я неправильно читаю вашу проблему.Вот ответ на этот вопрос.

function getMyTweets(headUrl) {
    return Rx.Observable.create(function(observer) {

        innerRequest(headUrl);
        function innerRequest(url) {
            var next = '';

            // Some magic get ajax function
            Rx.get(url).subscribe(function(res) {
                observer.onNext(res);
                next = res.refresh_url;
            },
            function() {
                // Some sweet handling code
                // Perhaps get head?
            },
            function() {
                innerRequest(next);
            });
        }
    });
}

Возможно, это не тот ответ, который вы просили.Если нет, извините!


Редактировать: После просмотра вашего кода, похоже, что вы хотите взять результаты в виде массива и наблюдать их.

// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
    .selectMany(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    });

// Ensures ordering
getMyTweets('url')
    .select(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    })
    .concat();
...