Поскольку я использую паркет js в node.js проекте. Я хочу направить поток сбора mongodb в паркетный поток записи и загрузить его на S3. В документации по паркету js хранится в физическом файле, как показано ниже:
ParquetWriter.openFile(schema, 'fruits.parquet');
Паркет js также имеет функцию, ниже приведены справочные материалы, но нет документации по их использованию.
ParquetWriter.openStream(schema, outputStream, opts)
https://github.com/ironSource/parquetjs/blob/master/lib/writer.js
А также имеет класс ниже, который преобразует поток в поток для записи.
class ParquetTransformer extends stream.Transform.
let streamParquet = new parquet.ParquetTransformer(parquetSchema)
Я написал ниже код
let readableStream=dbo.collection(collectionName).find(query);
itemsStream = cursor.stream();
let buffer = Buffer.from('');
writeableStream._write = (chunk, encoding, done) => {
buffer = Buffer.concat([buffer, chunk]);
done();
};
writeableStream.close = e => e(); // pass in an error if there was one
let parquetSchema = await convertJsonSchemaToParquetSchema(jsonSchema);
var parquetTransform = new parquet.ParquetTransformer(parquetSchema);
// pipe to connect the read, transform, and write streams
readableStream.pipe(StreamArray.withParser()).pipe(parquetTransform).pipe(writeableStream);
let result = await fileuploader.UploadFile(writeableStream, "bucketname");
Но, похоже, не работает. Может ли кто-нибудь подсказать мне, что я делаю не так. Также паркет js не предоставляет никаких документов для использования вышеупомянутой функции.