rxjs-поток API-интерфейса прокрутки эластичного поиска выдает пустой набор результатов - PullRequest
0 голосов
/ 12 июня 2018

Моя цель состоит в том, чтобы преобразовать результат elasticsearch в поток rxjs и подумал об этом, используя API scroll, извлекающий 1 точку данных при каждом вызове.Однако кажется, что мой поток rxjs не возвращает результатов для второго эластичного запроса (searchElastic).

Ниже приведен пример моего кода:

import * as Rx from 'rxjs';
import {elasticClient} from '../Helpers.ts';

function searchElastic({query, sort}) {
    const body: any = {
        size: 1,
        query,
        _source: { excludes: ['logbookType', 'editable', 'availabilityTag'] },
        sort
    };
    // keep the search results "scrollable" for 30 secs
    const scroll = '30s';

    return Rx.Observable
        .fromPromise(elasticClient.search({ index: 'data', body, scroll }))
        .concatMap(({_scroll_id, hits: {hits}}) => {
            const subject = new Rx.Subject();

            if(hits.length) {
                // initial data
                subject.onNext(hits[0]._source as ElasticDoc);
                console.log(hits[0]._id);

                const handleDoc = (err, res) => {
                    if(err) {
                        subject.onError(err);
                        return;
                    }

                    const {_scroll_id, hits: {hits}} = res;

                    if(!hits.length) {
                        subject.onCompleted();
                    } else {
                        subject.onNext(hits[0]._source as ElasticDoc);
                        console.log(hits[0]._id);
                        setImmediate(() =>
                            elasticClient.scroll({scroll, scrollId: _scroll_id},
                                handleDoc));
                    }
                };

                setImmediate(() =>
                    elasticClient.scroll({scroll, scrollId: _scroll_id},
                        handleDoc));
            } else {
                subject.onCompleted();
            }

            return subject.asObservable();
        });
}

function getEntries() {
    const entriesQuery = {
        query: {
            filtered: {
                filter: {
                    bool: {
                        must: [{
                            range: {
                                creationTimestamp: {
                                    gte: "2018-04-01T07:55:59.915Z",
                                    lte: "2018-04-01T07:57:08.915Z"
                                }
                            }
                        }, {
                            query: {
                                query_string: {
                                    query: "+type:*scan*"
                                }
                            }
                        }]
                    }
                }
            }
        },
        sort: [{
            creationTimestamp: {
                order: "asc"
            },
            id: {
                order: "asc"
            }
        }]
    };
    return searchElastic(entriesQuery)
        .concatMap(entry => {
            // all entries are logged correctly
            console.log(entry.id);
            // array that contains MongoDB _ids as strings
            const ancestors = entry.ancestors || [];

            // if no parents => doesn't match
            if(!ancestors.length) {
                return Rx.Observable.empty();
            }

            const parentsQuery = {
                query: {
                    filtered: {
                        filter: {
                            bool: {
                                must: [{
                                    range: {
                                        creationTimestamp: {
                                            gte: "2018-04-01T07:55:59.915Z",
                                            lte: "2018-04-01T07:57:08.915Z"
                                        }
                                    }
                                }, {
                                    query: {
                                        query_string: {
                                            query: "+type:*block* +name:*Report*"
                                        }
                                    }
                                }]
                            }
                        }
                    }
                },
                sort: [{
                    creationTimestamp: {
                        order: "asc"
                    },
                    id: {
                        order: "asc"
                    }
                }]
            };
            parentsQuery.query.filtered.filter.bool.must.push({
                terms: {
                    id: ancestors
                }
            });

            // fetch parent entries
            return searchElastic(parentsQuery)
                .count()
                .concatMap(count => {
                    // count is always 0 even though entries are logged
                    // in the searchElastic console.logs
                    console.log(entry.id, count);
                    return Rx.Observable.just(entry);
                });
        });
}

function executeQuery() {
    try {
       getEntries()
            .subscribe(
                (x) => console.log(x.id),
                err => console.error(err),
                () => {}
            )
    } catch(e) {
        console.error(e);
    }
}

Похоже, этоrxjs проблема, поскольку все записи предков регистрируются, но count всегда возвращает 0.

PS, используя elasticsearch v1.7

1 Ответ

0 голосов
/ 12 июня 2018

После игры с парой rxjs примеров с предметами создается впечатление, что предмет завершается (onCompleted), прежде чем наблюдатель подписывается на него.

Рабочий пример

var subject = new Rx.Subject();

var subscription = subject.subscribe(
  function(x) {
    console.log('onNext: ' + x);
  },
  function(e) {
    console.log('onError: ' + e.message);
  },
  function() {
    console.log('onCompleted');
  });

subject.onNext(1);
// => onNext: 1

subject.onNext(2);
// => onNext: 2

subject.onCompleted();
// => onCompleted
<!DOCTYPE html>
<html>

<head>
  <script src="//code.jquery.com/jquery-2.1.0.min.js"></script>
  <title>JS Bin</title>
  <script src="//cdn.jsdelivr.net/rsvp/3.0.6/rsvp.js"></script>
  <script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.2.28/rx.all.min.js"></script>
</head>

</html>

Разбитый пример

var subject = new Rx.Subject();

subject.onNext(1);
// => onNext: 1

subject.onNext(2);
// => onNext: 2

subject.onCompleted();
// => onCompleted

var subscription = subject.subscribe(
  function(x) {
    console.log('onNext: ' + x);
  },
  function(e) {
    console.log('onError: ' + e.message);
  },
  function() {
    console.log('onCompleted');
  });
<!DOCTYPE html>
<html>

<head>
  <script src="//code.jquery.com/jquery-2.1.0.min.js"></script>
  <title>JS Bin</title>
  <script src="//cdn.jsdelivr.net/rsvp/3.0.6/rsvp.js"></script>
  <script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.2.28/rx.all.min.js"></script>
</head>

</html>

Поэтому я исправил это, изменив searchElastic на следующее:

function searchElasticStream({query, sort}) {
    const body: any = {
        size: 1,
        query,
        _source: { excludes: ['logbookType', 'editable', 'availabilityTag'] },
        sort
    };
    // keep the search results "scrollable" for 30 secs
    const scroll = '30s';

    return Rx.Observable
        .fromPromise(elasticClient.search({ index: 'data', body, scroll }))
        .flatMap(({_scroll_id, hits: {hits}}) => {
            const subject = new Rx.Subject();

            // this made the difference
            setImmediate(() => {
                if(hits.length) {
                    // initial data
                    subject.onNext(hits[0]._source as ElasticDoc);

                    const handleDoc = (err, res) => {
                        if(err) {
                            subject.onError(err);
                            return;
                        }

                        const {_scroll_id, hits: {hits}} = res;

                        if(!hits.length) {
                            subject.onCompleted();
                        } else {
                            subject.onNext(hits[0]._source as ElasticDoc);
                            setImmediate(() =>
                                elasticClient.scroll({scroll, scrollId: _scroll_id},
                                    handleDoc));
                        }
                    };

                    setImmediate(() =>
                        elasticClient.scroll({scroll, scrollId: _scroll_id},
                            handleDoc));
                } else {
                    subject.onCompleted();
                }
            });

            return subject.asObservable();
        });
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...