Я проверяю файлы CSV и затем записываю их в базу данных.Текущая настройка - File.pipe (validationStream) .pipe (DBWriteStream);Как мне обернуть все записи в моем DBWriteStream в транзакцию, которую можно отменить, если в validationStream обнаружены ошибки?Если ошибки найдены, я хочу отменить весь файл.
Node.js 7, как использовать транзакцию сиквелизирования с async / await?
Приведенный выше пример, а также приведенный в документации, зависят от цепочек обещаний и асинхронных функций.тем не менее, мой поток является функцией записи потока преобразования и не использует Promise.
async function getWriteStream(params) {
const rowLimit = 2000;
let rowsToWrite = [];
const writeStream = through2.obj(write, end);
// I'd like to rollback this report creation
const result = await createReportId(params);
const reportId = result.id;
// buffer = {isValid: bool, data: rowData}
// if I receive an "isValid: false" from the validation stream,
// i'd like to stop writing and rollback everything
function write(buffer, encoding, next) {
let rollBack = false;
if (buffer.isValid && !rollBack) {
buffer.data.forEach(e => {
e.some_report_id = reportId;
pushRows(e);
});
} else if (!rollBack) {
rollBack = true;
// rollback transaction?
}
next();
}
function end(done) {
writeRows();
done();
}
async function pushRows(row) {
rowsToWrite.push(row);
if (rowsToWrite.length >= rowLimit) {
await writeRows();
}
}
// want to rollback all of these writes
async function writeRows() {
if (rowsToWrite.length > 0) {
const toSave = rowsToWrite.slice(0);
rowsToWrite = [];
await Some.bulkCreate(toSave);
}
}
return writeStream;
}
Моя проблема в том, что я не хочу выдавать ошибку и останавливать поток проверки, потому что, если в CSV есть несколько ошибок, я хотел бы собрать всю эту информацию в первый раз и отправитьпользователю.
Кроме того, строки на самом деле записываются в функции «запись» потока преобразования, которая не является асинхронной функцией, поэтому я не могу ее ждать.