В моем последнем посте я пытался буферизовать ожидающие HTTP-запросы с помощью Rx JS. Я думал, что bufferCount
- это то, что мне нужно, но я обнаружил, что мои элементы были меньше размера буфера, он просто ждал, а я не хотел этого.
Теперь у меня есть новая схема, использующая take
. Кажется, он делает то, что я хочу, за исключением случаев, когда в моем результирующем наблюдаемом нет элементов (слева), полное никогда не вызывается.
Например, у меня есть что-то вроде следующего ..
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
// FlatMap turns the observable of a single Requests[] to observable of Requests
flatMap(x => x),
// Only get requests unprocessed
filter(x => x.processedState === ProcessedState.unprocessed),
// Batches of batchSize in each emit
take(3),
);
let requestsSent = false;
pendingRequests.subscribe(nextRequest => {
requestsSent = true;
this.sendRequest(nextEvent);
},
error => {
this.logger.error(`${this.moduleName}.sendRequest: Error ${error}`);
},
() => {
// **** This is not called if pendingRequests is empty ****
if (requestsSent ) {
this.store$.dispatch(myActions.continuePolling());
} else {
this.store$.dispatch(myActions.stopPolling());
}
}
);
Таким образом, take(3)
получит следующие 3 ожидающих запроса и отправит их (), где я также отправляю действие, чтобы установить обработанное состояние на not ProcessedState.pending, чтобы мы не получили их в следующем опросе)
Все это работает нормально, но когда pendingRequests
в конечном итоге ничего не возвращает (пуст), блок completed
отмечен знаком ****. не называется. Я бы подумал, что это будет вызвано сразу.
Я не уверен, имеет ли это значение, поскольку, поскольку я не отправляю действие для продолжения опроса, опрос останавливается.
Но меня больше всего беспокоит то, что если pendingRequests
не заполнено, нужно ли мне отказаться от подписки, чтобы предотвратить утечки? Я предполагаю, что если complete
вызывается , мне не нужно отказываться от подписки?
Обновление
Чтобы pendingReguests
всегда выполнялся, я немного другой подход. Вместо того, чтобы использовать операторы rx
для «фильтрации», я просто получаю каждый раз весь список и только take(1)
в нем. Я всегда буду получать список, даже если он пуст, поэтому pendingReguests
будет завершаться каждый раз.
ie
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(take(1))
И тогда я могу просто фильтровать и загружать внутри наблюдаемый ..
pendingRequests.subscribe(nextRequest => {
let requestToSend = nextRequest.filter(x => x.processedState === ProcessedState.unprocessed);
const totalPendingCount = requestToSend.length;
requestToSend = requestToSend slice(0, this.batchSize);
for (const nextRequest of requestToSend) {
this.sendRequest(nextRequest);
}
if (totalPendingCount > this.batchSize) {
this.store$.dispatch(myActions.continuePolling());
}
В моем тестировании до сих пор я всегда получал complete
для запуска.
Кроме того, имея 2 действия (startPolling и continuePolling ) Я могу поместить задержку только в continuePolling, поэтому при первом запуске опроса (например, приложение только что вернулось в сеть после того, как оно вышло за пределы диапазона сети), мы отправляем сразу же и задерживаем, только если у нас больше, чем размер пакета
Возможно, это не 100% способ "rxy", но, похоже, пока работает. Здесь какие-то проблемы?