После игры с парой rxjs
примеров с предметами создается впечатление, что предмет завершается (onCompleted
), прежде чем наблюдатель подписывается на него.
Рабочий пример
var subject = new Rx.Subject();
var subscription = subject.subscribe(
function(x) {
console.log('onNext: ' + x);
},
function(e) {
console.log('onError: ' + e.message);
},
function() {
console.log('onCompleted');
});
subject.onNext(1);
// => onNext: 1
subject.onNext(2);
// => onNext: 2
subject.onCompleted();
// => onCompleted
<!DOCTYPE html>
<html>
<head>
<script src="//code.jquery.com/jquery-2.1.0.min.js"></script>
<title>JS Bin</title>
<script src="//cdn.jsdelivr.net/rsvp/3.0.6/rsvp.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.2.28/rx.all.min.js"></script>
</head>
</html>
Разбитый пример
var subject = new Rx.Subject();
subject.onNext(1);
// => onNext: 1
subject.onNext(2);
// => onNext: 2
subject.onCompleted();
// => onCompleted
var subscription = subject.subscribe(
function(x) {
console.log('onNext: ' + x);
},
function(e) {
console.log('onError: ' + e.message);
},
function() {
console.log('onCompleted');
});
<!DOCTYPE html>
<html>
<head>
<script src="//code.jquery.com/jquery-2.1.0.min.js"></script>
<title>JS Bin</title>
<script src="//cdn.jsdelivr.net/rsvp/3.0.6/rsvp.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.2.28/rx.all.min.js"></script>
</head>
</html>
Поэтому я исправил это, изменив searchElastic
на следующее:
function searchElasticStream({query, sort}) {
const body: any = {
size: 1,
query,
_source: { excludes: ['logbookType', 'editable', 'availabilityTag'] },
sort
};
// keep the search results "scrollable" for 30 secs
const scroll = '30s';
return Rx.Observable
.fromPromise(elasticClient.search({ index: 'data', body, scroll }))
.flatMap(({_scroll_id, hits: {hits}}) => {
const subject = new Rx.Subject();
// this made the difference
setImmediate(() => {
if(hits.length) {
// initial data
subject.onNext(hits[0]._source as ElasticDoc);
const handleDoc = (err, res) => {
if(err) {
subject.onError(err);
return;
}
const {_scroll_id, hits: {hits}} = res;
if(!hits.length) {
subject.onCompleted();
} else {
subject.onNext(hits[0]._source as ElasticDoc);
setImmediate(() =>
elasticClient.scroll({scroll, scrollId: _scroll_id},
handleDoc));
}
};
setImmediate(() =>
elasticClient.scroll({scroll, scrollId: _scroll_id},
handleDoc));
} else {
subject.onCompleted();
}
});
return subject.asObservable();
});
}