Импорт CSV с использованием схемы Mongoose - PullRequest
0 голосов
/ 15 мая 2018

В настоящее время мне нужно вставить большой файл CSV в базу данных mongo, и порядок значений должен определять ключ для записи в базе данных:

Пример файла CSV:

9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0

Код для разбора его на массивы:

var fs = require("fs");

var csv = require("fast-csv");

fs.createReadStream("rank.txt")
    .pipe(csv())
    .on("data", function(data){
        console.log(data);
    })
    .on("end", function(data){
        console.log("Read Finished");
    });

Вывод кода:

[ '9',
  '1557',
  '358',
  '286',
  'Mutantville',
  '4368',
  '2358026',
  '',
  'M',
  '0',
  '0',
  '0',
  '1',
  '0' ]
[ '9',
  '1557',
  '359',
  '147',
  'Wroogny',
  '4853',
  '2356061',
  '',
  'D',
  '0',
  '0',
  '0',
  '1',
  '0' ]

Как вставить массивы в схему mongoose, чтобы перейти в базу данных mongo?

Схема:

var mongoose = require("mongoose");


var rankSchema = new mongoose.Schema({
   serverid: Number,
   resetid: Number,
   rank: Number,
   number: Number,
   name: String,
   land: Number,
   networth: Number,
   tag: String,
   gov: String,
   gdi: Number,
   protection: Number,
   vacation: Number,
   alive: Number,
   deleted: Number
});

module.exports = mongoose.model("Rank", rankSchema);

Порядок массива должен соответствовать порядку схемы, например, в массиве первое число 9 должно всегда сохраняться, так как они используют ключ "serverid" и так далее.Я использую Node.JS

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Вы можете сделать это с помощью fast-csv, получив headers из определения схемы, которое будет возвращать проанализированные строки как "объекты". У вас действительно есть некоторые несоответствия, поэтому я отметил их исправлениями:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Пока схема действительно соответствует указанному CSV, все в порядке. Это те исправления, которые я вижу, но если вам нужно, чтобы фактические имена полей были выровнены по-другому, вам нужно отрегулировать. Но в основном там было Number в позиции, где есть String и, по сути, дополнительное поле, которое, как я полагаю, является пустым в CSV.

В общих чертах получают массив имен полей из схемы и передают его в опции при создании экземпляра парсера csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Как только вы действительно это сделаете, вы получите «Объект» вместо массива:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Не беспокойтесь о «типах», потому что Mongoose будет приводить значения в соответствии со схемой.

Остальное происходит в обработчике для события data. Для максимальной эффективности мы используем insertMany() для записи в базу данных только один раз каждые 10 000 строк. То, как это происходит на сервере и процессах, зависит от версии MongoDB, но 10000 должны быть достаточно разумными, исходя из среднего числа полей, которые вы импортируете для одной коллекции, с точки зрения «компромисса» для использования памяти и записи разумный сетевой запрос. При необходимости уменьшите число.

Важные части - пометить эти вызовы как async функции и await результат insertMany() перед продолжением. Также нам нужно pause() потока и resume() для каждого элемента, в противном случае мы рискуем перезаписать buffer документов для вставки до их фактической отправки. pause() и resume() необходимы для создания «обратного давления» на трубу, в противном случае элементы просто «выходят» и запускают событие data.

Естественно, контроль для 10 000 записей требует, чтобы мы проверяли это как на каждой итерации, так и при завершении потока, чтобы очистить буфер и отправить все оставшиеся документы на сервер.

Это действительно то, что вы хотите сделать, поскольку вы, конечно, не хотите запускать асинхронный запрос к серверу как на «каждой» итерации в событии data, так и, по существу, не ожидая завершения каждого запроса. Вам не придется проверять это для «очень маленьких файлов», но при любой реальной загрузке вы наверняка превысите стек вызовов из-за асинхронных вызовов «в полете», которые еще не завершены.


К вашему сведению - package.json используется. mz не является обязательным, поскольку это просто модернизированная Promise библиотека стандартных встроенных библиотек узлов, к которой я просто привык. Код, конечно, полностью взаимозаменяем с модулем fs.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

На самом деле с Node v8.9.x и выше, мы даже можем сделать это намного проще с реализацией AsyncIterator через модуль stream-to-iterator. Он все еще находится в режиме Iterator<Promise<T>>, но должен работать до тех пор, пока Node v10.x не станет стабильным LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

По сути, вся обработка «события» потока, его приостановка и возобновление заменяется простым циклом for:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Легко! Это исправлено в более поздней реализации узла с for..await..of, когда оно станет более стабильным. Но вышеперечисленное прекрасно работает на указанной версии и выше.

0 голосов
/ 15 мая 2018

Говоря @Neil Lunn нужно заголовок в самом CSV.

Пример использования модуля csvtojson .

const csv = require('csvtojson');

const csvArray = [];
  csv()
    .fromFile(file-path)
    .on('json', (jsonObj) => {
      csvArray.push({ name: jsonObj.name, id: jsonObj.id });
    })
    .on('done', (error) => {
      if (error) {
        return res.status(500).json({ error});
      }
          Model.create(csvArray)
      .then((result) => {
         return res.status(200).json({result});
      }).catch((err) => {
          return res.status(500).json({ error});
      });
      });
    });
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...