В rxjs, как я могу связать в цепочке массивы данных, полученных из разных API? - PullRequest
0 голосов
/ 27 января 2019

Я вызываю API и получаю массив результатов, я проверяю на нумерацию страниц и, если больше страниц существует, я вызываю следующую страницу, повторяю, пока не останется больше страниц.

Для каждого массива результатовЯ вызываю другую конечную точку и делаю то же самое: получаю массив результатов, проверяю другую страницу и снова вызываю конечную точку.Стирать, промыть, повторить.

Например:

Я хочу получить список стран, которые могут быть разбиты на страницы, затем для каждой страны я хочу получить список городов, которые могуттакже быть разбитым на страницы.И для каждого города я выполняю набор преобразований, а затем сохраняю их в базе данных.

Я уже пробовал это, но застрял:


const grabCountries = Observable.create(async (observer) => {
    const url = 'http://api.com/countries'
    let cursor = url
    do {

        const results = fetch(cursor)

        // results = { 
        //   data: [ 'Canada', 'France', 'Spain' ],
        //   next: '47asd8f76358df8f4058898fd8fab'
        // }

        results.data.forEach(country => { observer.next(country) })

        cursor = results.next ? `${url}/${results.next}` : undefined

    } while(cursor)

})


const getCities = {
    next: (country) => {
        const url = 'http://api.com/cities'
        let cursor = url
        do {

            const results = fetch(cursor)

            // results = {
            //     data: [ 
            //         'Montreal', 'Toronto', 
            //         'Paris', 'Marseilles', 
            //         'Barcelona', 'Madrid' 
            //     ],
            //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
            // }

            results.data.forEach(city => { 
                `**** What do I do here?? ****` 
            })

            cursor = results.next ? `${url}/${results.next}` : undefined

        } while(cursor)
    }
}

Я попробовал несколько подходов:

Создание предмета (иногда мне нужно сделать параллельно обработанную базуо результатах 'grabCountries'. Например, я могу захотеть хранить страны в БД параллельно с захватом городов.)

const intermediateSubject = new Subject()

intermediateSubject.subscribe(storeCountriesInDatabase)
intermediateSubject.subscribe(getCities)

Я также пробовал прокладывать и отображать, но, похоже, это в основномто же самое.

Когда я писал это, я думал об этом решении, и оно, кажется, работает нормально, я просто хотел бы знать, делаю ли я это слишком сложным.Могут быть случаи, когда мне нужно сделать больше, чем несколько вызовов API подряд.(Представьте себе, Страны => Штаты => Города => Пекарни => Отзывы => Комментарии => Ответы) Так что это странное сопоставление с другим шаблоном обратного вызова наблюдателя может стать неприятным.

Так что это то, что я сейчас имею в основном:

// grabCountries stays the same as above, but the rest is as follows:

const grabCities = (country) =>
  Observable.create(async (observer) => {
    const url = `http://api.com/${country}/cities`
      let cursor = url
      do {
       const results = fetch(cursor)

       // results = {
       //     data: [
       //         'Montreal', 'Toronto',
       //         'Paris', 'Marseilles',
       //         'Barcelona', 'Madrid'
       //     ],
       //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
       // }

       results.data.forEach(city => {
         observer.next(city)
       })

    cursor = results.next ? `${url}/${results.next}` : undefined

    } while (cursor)
})

const multiCaster = new Subject()

grabCountries.subscribe(multiCaster)
multiCaster.pipe(map((country) => {
    grabCities(country).pipe(map(saveCityToDB)).subscribe()
})).subscribe()
multiCaster.pipe(map(saveCountryToDB)).subscribe()

tl; dr - я вызываю API, который получает разбитый на страницы набор результатов в массиве, и мне нужно отобразить каждый элемент и вызывать другой API, который получает другой разбитый на страницы набор результатов, каждый набортакже в массиве.

Является ли наилучшим способом вложение одного наблюдаемого в другое и отображение результатов через callApiForCountries.pipe (map (forEachCountryCallApiForCities)) 'или у вас есть другие рекомендации?

Ответы [ 3 ]

0 голосов
/ 28 января 2019

Вот код, который должен работать с последовательным сканированием следующего URL.Вы начинаете с {next: url}, пока res.next не будет доступен.

of({next:http://api.com/cities}).pipe(
    expand(res=>results.next ? `${url}/${results.next}` : undefined
    takeWhile(res=>res.next!==undefined)
).subscribe()
0 голосов
/ 04 февраля 2019

Хорошо, поэтому я потратил много сил на это и придумал два решения, которые, кажется, работают.

const nestedFlow = () => {
	fetchAccountIDs.pipe(map(accountIds => {
		getAccountPostIDs(accountIds) // Has the do loop for paging inside
			.pipe(
				map(fetchPostDetails),
				map(mapToDBFormat),
				map(storeInDB)
			).subscribe()
	})).subscribe()
}


const expandedflow = () => {
	fetchAccountIDs.subscribe((accountId) => {
		// accountId { accountId: '345367geg55sy'}
		getAccountPostIDs(accountId).pipe(
			expand((results) => {
				/*
				results : {
					postIDs: [
						131424234,
						247345345,
					],
					cursor: '374fg8v0ggfgt94',
				}
				*/
				const { postIDs, cursor } = results
				if (cursor) return getAccountPostIDs({...accountId, cursor})
				return { postIDs, cursor }
			}),
			takeWhile(hasCursor, true), // recurs until cursor is undefined
			concatMap(data => data.postIDs), 
			map(data => ({ post_id: data })), 
			map(fetchPostDetails), 
			map(mapToDBFormat), 
			map(storeInDB) 
		).subscribe()
	})
}

Кажется, что оба работают с одинаковой производительностью.Я читал кое-что, где выход из потока данных - плохая практика, и вы должны все передавать по конвейеру, но я не знаю, как устранить первый выход в «extendedFlow», потому что «expand» должен вызывать наблюдаемый, но, возможно,это может быть сделано.

Теперь мне просто нужно решить проблемы состояния гонки с момента вызова 'complete' в getAccountPostIDs, когда последняя запись сохраняется в БД.В настоящее время в моем тесте observer.complete заканчивается до 3-х из действий upsert.

Любые комментарии приветствуются, и я надеюсь, что это поможет кому-то в будущем.

0 голосов
/ 28 января 2019

Вам нужен оператор expand .Он ведет себя рекурсивно, поэтому он соответствует идее иметь разбитые на страницы результаты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...