Вы можете сделать это с помощью fast-csv, получив headers
из определения схемы, которое будет возвращать проанализированные строки как "объекты". У вас действительно есть некоторые несоответствия, поэтому я отметил их исправлениями:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Пока схема действительно соответствует указанному CSV, все в порядке. Это те исправления, которые я вижу, но если вам нужно, чтобы фактические имена полей были выровнены по-другому, вам нужно отрегулировать. Но в основном там было Number
в позиции, где есть String
и, по сути, дополнительное поле, которое, как я полагаю, является пустым в CSV.
В общих чертах получают массив имен полей из схемы и передают его в опции при создании экземпляра парсера csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Как только вы действительно это сделаете, вы получите «Объект» вместо массива:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Не беспокойтесь о «типах», потому что Mongoose будет приводить значения в соответствии со схемой.
Остальное происходит в обработчике для события data
. Для максимальной эффективности мы используем insertMany()
для записи в базу данных только один раз каждые 10 000 строк. То, как это происходит на сервере и процессах, зависит от версии MongoDB, но 10000 должны быть достаточно разумными, исходя из среднего числа полей, которые вы импортируете для одной коллекции, с точки зрения «компромисса» для использования памяти и записи разумный сетевой запрос. При необходимости уменьшите число.
Важные части - пометить эти вызовы как async
функции и await
результат insertMany()
перед продолжением. Также нам нужно pause()
потока и resume()
для каждого элемента, в противном случае мы рискуем перезаписать buffer
документов для вставки до их фактической отправки. pause()
и resume()
необходимы для создания «обратного давления» на трубу, в противном случае элементы просто «выходят» и запускают событие data
.
Естественно, контроль для 10 000 записей требует, чтобы мы проверяли это как на каждой итерации, так и при завершении потока, чтобы очистить буфер и отправить все оставшиеся документы на сервер.
Это действительно то, что вы хотите сделать, поскольку вы, конечно, не хотите запускать асинхронный запрос к серверу как на «каждой» итерации в событии data
, так и, по существу, не ожидая завершения каждого запроса. Вам не придется проверять это для «очень маленьких файлов», но при любой реальной загрузке вы наверняка превысите стек вызовов из-за асинхронных вызовов «в полете», которые еще не завершены.
К вашему сведению - package.json
используется. mz
не является обязательным, поскольку это просто модернизированная Promise
библиотека стандартных встроенных библиотек узлов, к которой я просто привык. Код, конечно, полностью взаимозаменяем с модулем fs
.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
На самом деле с Node v8.9.x и выше, мы даже можем сделать это намного проще с реализацией AsyncIterator
через модуль stream-to-iterator
. Он все еще находится в режиме Iterator<Promise<T>>
, но должен работать до тех пор, пока Node v10.x не станет стабильным LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
По сути, вся обработка «события» потока, его приостановка и возобновление заменяется простым циклом for
:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Легко! Это исправлено в более поздней реализации узла с for..await..of
, когда оно станет более стабильным. Но вышеперечисленное прекрасно работает на указанной версии и выше.