Я не знаю, возможно ли получить количество ожидающих документов от курсора. Природа концепции потока как-то противоречит этому подходу.
При работе с потоками узлов я всегда использую highland. js lib. Highland. js для потоков, что loda sh для массивов. Это делает обработку потоков действительно простой и интуитивно понятной.
Я написал небольшое демонстрационное приложение, чтобы показать, как можно использовать высокогорье для решения вашей проблемы:
const h = require("highland")
const mongoose = require("mongoose")
const moment = require("moment")
const faker = require("faker")
const Promise = require("bluebird")
/////// Model ////////////////////////////////////////////////////////////////
const Quote = mongoose.model("Quote", {
lastPriceRequest: Number,
updatedAt: Date,
})
/////// START ////////////////////////////////////////////////////////////////
const from = moment().subtract(20, "days")
const to = moment()
console.log(`query from='${from.format("L")}' to='${to.format("L")}'`)
connect()
// .then(() => addRandomQuotes(100))
.then(() => getQuotesStream(from.toDate(), to.toDate()))
.then(handleQuotesStream)
.then(console.log)
.catch(console.error)
/////// FNs ////////////////////////////////////////////////////////////////
async function connect () {
await mongoose.connect('mongodb://localhost/highland-test-db', {
useNewUrlParser: true,
useUnifiedTopology: true
})
}
async function addRandomQuotes (count) {
for (var i = 0; i < count; i++) {
let quote = {
lastPriceRequest: faker.commerce.price(),
updatedAt: faker.date.recent(60),
}
const Q = new Quote(quote)
await Q.save()
}
console.log(`created ${count} random quotes`)
}
function getQuotesStream (from, to) {
return Quote
.find()
.where("updatedAt").gte(from)
.where("updatedAt").lt(to)
.where("lastPriceRequest").ne(null)
.sort("updatedAt")
.lean()
.cursor()
}
function handleQuotesStream (stream) {
let count = 0
return new Promise(function (resolve, reject) {
h(stream)
.tap(() => count++)
.batch(50)
.map(sendBatchToApiEndpoint)
.series()
.done(() => resolve({count}))
})
}
function sendBatchToApiEndpoint (items) {
console.log(`sending batch of ${items.length} items`)
// send the data here / wrap the promise into highland (like return h(promise))
return h(Promise.resolve(items))
}
Позволяет отправлять все запрашиваемые документы партиями. Все партии будут иметь размер 50. Последний размер партии будет произвольным (<= 50). Но вам не нужно заботиться о размерах партии самостоятельно. Это будет управляться Highland. </p>
Демонстрационный выход:
>node highland-test.js
query from='02/08/2020' to='02/28/2020'
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 47 items
{ count: 297 }