Mon goose курсор получить размер потока - PullRequest
0 голосов
/ 28 февраля 2020

Доброе утро,

Я делаю запрос базы данных к своему MongoDB через Mon goose ODM в javascript. Этот запрос возвращает поток (использование функции курсора mon goose).

function findQuotes(from, to) {
  const selection = [
    'policyholder',
    'terms.startDate',
    'terms.variant',
  ];

  return Draft.find({
    lastPriceRequest: { $ne: null },
    updatedAt: {
      $gte: from.toISOString(),
      $lte: to.toISOString(),
    },
  })
    .select(selection.join(' '))
    .lean(true)
    .cursor({ batchSize: 200 }); 
}

Я перебираю этот поток с помощью функции Mon goose eachAsyn c. Во время этой итерации каждые 50 черновиков я хотел бы отправлять их в указанный c API конечной точки. Эта конечная точка API принимает только запрос POST (поток JSON в нашем случае невозможен).

Чтобы этот код работал, мне нужно количество документов в курсоре. В противном случае, он никогда не отправит последние черновики, которые меньше 50.

/**
 *
 * @param {timestamp} from
 * @param {timestamp} to
 * @returns {Promise}
 */
async function exportQuotes(from, to) {
  let i = 1;
  let res = [];

  const quotes = findQuotes(from, to);

  return quotes.eachAsync(async (doc) => {

    res.push(quoteMapper(doc)); // mapped to a specific schema

    if (i === 50) {
      await sendQuotesTest(res); 
      i = 0;
      res = [];
    }
    i += 1;
  });
}

Версия Mon goose: Версия MongoDB:

1 Ответ

0 голосов
/ 28 февраля 2020

Я не знаю, возможно ли получить количество ожидающих документов от курсора. Природа концепции потока как-то противоречит этому подходу.

При работе с потоками узлов я всегда использую highland. js lib. Highland. js для потоков, что loda sh для массивов. Это делает обработку потоков действительно простой и интуитивно понятной.

Я написал небольшое демонстрационное приложение, чтобы показать, как можно использовать высокогорье для решения вашей проблемы:

const h = require("highland")
const mongoose = require("mongoose")
const moment = require("moment")
const faker = require("faker")
const Promise = require("bluebird")

/////// Model ////////////////////////////////////////////////////////////////

const Quote = mongoose.model("Quote", {
   lastPriceRequest: Number,
   updatedAt: Date,
})

/////// START ////////////////////////////////////////////////////////////////

const from = moment().subtract(20, "days")
const to = moment()
console.log(`query from='${from.format("L")}' to='${to.format("L")}'`)

connect()
   // .then(() => addRandomQuotes(100))
   .then(() => getQuotesStream(from.toDate(), to.toDate()))
   .then(handleQuotesStream)
   .then(console.log)
   .catch(console.error)

/////// FNs ////////////////////////////////////////////////////////////////

async function connect () {
   await mongoose.connect('mongodb://localhost/highland-test-db', {
      useNewUrlParser: true,
      useUnifiedTopology: true
   })
}

async function addRandomQuotes (count) {
   for (var i = 0; i < count; i++) {
      let quote = {
         lastPriceRequest: faker.commerce.price(),
         updatedAt: faker.date.recent(60),
      }
      const Q = new Quote(quote)
      await Q.save()
   }
   console.log(`created ${count} random quotes`)
}

function getQuotesStream (from, to) {
   return Quote
      .find()
      .where("updatedAt").gte(from)
      .where("updatedAt").lt(to)
      .where("lastPriceRequest").ne(null)
      .sort("updatedAt")
      .lean()
      .cursor()
}

function handleQuotesStream (stream) {
   let count = 0
   return new Promise(function (resolve, reject) {
      h(stream)
         .tap(() => count++)
         .batch(50)
         .map(sendBatchToApiEndpoint)
         .series()
         .done(() => resolve({count}))
   })
}

function sendBatchToApiEndpoint (items) {
   console.log(`sending batch of ${items.length} items`)
   // send the data here / wrap the promise into highland (like return h(promise))
   return h(Promise.resolve(items))
}

Позволяет отправлять все запрашиваемые документы партиями. Все партии будут иметь размер 50. Последний размер партии будет произвольным (<= 50). Но вам не нужно заботиться о размерах партии самостоятельно. Это будет управляться Highland. </p>

Демонстрационный выход:

>node highland-test.js
query from='02/08/2020' to='02/28/2020'
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 50 items
sending batch of 47 items
{ count: 297 }
...