У меня возникли проблемы с рекурсивной цепочкой наблюдаемых.
Я работаю с RxJS, которая в настоящее время находится в версии 1.0.10621 и содержит основную функциональность Rx в сочетании с Rx для jQuery.
Позвольте мне представить пример сценария для моей проблемы:
Я опрашиваю API поиска в Твиттере (ответ JSON) для твитов / обновлений, содержащих определенное ключевое слово. Ответ также включает "refresh_url", который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.
Rx.jQuery позволяет мне заставить API поиска в Твиттере вызывать наблюдаемое событие, которое генерирует одно onNext и затем завершается. До сих пор я пытался, чтобы обработчик onNext запомнил refresh_url и использовал его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая пара + наблюдатель следует за другой бесконечно.
Проблема с этим подходом:
Наблюдаемые наблюдатели / наблюдатели уже живы, когда их предшественники еще не уничтожены.
Я должен сделать много мерзкой бухгалтерии, чтобы сохранить действительную ссылку на ныне живущего наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном цикле). Эта ссылка, конечно, необходима, чтобы отписаться / удалить наблюдателя.
Альтернативой бухгалтерии может быть реализация побочного эффекта с помощью «все еще работает?» - логического значения, как я сделал в моем примере.
Пример кода:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Не обманывайтесь двойным слоем наблюдаемых / наблюдателей от запросов к твитам.
Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастически; Но я думаю, что это совсем другое. На данный момент.
Эрик Мейер указал мне на оператор Expand (см. Пример ниже) и предложил Объединить шаблоны в качестве альтернативы.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.
Итак, мой вопрос: как сделать трюк с расширением лучше всего в RxJS?
EDIT:
Оператор раскрытия может быть реализован, как показано в этой теме . Но для этого потребуется генераторов (а у меня только JS <1.6). <Br />
К сожалению, RxJS 2.0.20304-бета не реализует метод расширения.