Я новичок в потоках, и мне нужно загрузить файл с ftp и отправить его в виде потока на azure stageblock (npm @ azure / storage-blob) - PullRequest
0 голосов
/ 16 июня 2020

ftp отправляет данные в потоке с возможностью записи, а метод azure stageblock принимает данные в читаемом потоке, и когда я пытаюсь сохранить данные, я получаю сообщение об ошибке. Ошибка: тело должно быть строкой, Blob, ArrayBuffer, ArrayBufferView или функцией, возвращающей NodeJS .ReadableStream.

    const ftp = require("basic-ftp");
    var fs = require("fs");
    const { BlobServiceClient ,BlockBlobClient} = require('@azure/storage-blob');
    var {Transform}=require('stream');
    var stream = require('stream');
    var fileWriteStream = new stream.Transform();
async function connectFTP(retry = 1, recon = false) {

    const blobServiceClient = await BlobServiceClient.fromConnectionString("****");
    var containerClient = await blobServiceClient.getContainerClient("***");//Establishing ContainerClient
    const blockBlobClient = await containerClient.getBlockBlobClient("Myblock.txt");
    var fileSize=0;
    const client = new ftp.Client(0);
    client.ftp.verbose = false;

    var ftpCredentials = 
        {
            host: "****",
            user: "***",
            password: "***$",
            secure: ***
        };


        try {
            let fileSize=0;
            fileWriteStream._transform = async function (fileChunk, encoding, done) {
                 fileSize += fileChunk.length;

             this.push(fileChunk)

         //HERE I AM GETTING ERROR WHILE SENDING DATA AS TRANSORM AS IT CAN BE READABLE OR WRITABLE STREAM
                blockBlobClient.stageBlock(Buffer.from('id').toString('base64'), fileWriteStream, fileSize)
            };
            return await client.access(ftpCredentials).then((conRes) => {
                conRes.ftpClient = client;
                 client.downloadTo(fileWriteStream,"/Demo Test 3/Patch16 Items.txt").then((res)=>{ console.log("success",res)
                 blockBlobClient.commitBlockList(["ALLBLOCKIDS"]).then((res)=>{console.log("Res",res)}).catch((err)=>console.log("ERR",err))
                }).catch((err)=>{console.log(err)})
                 console.log("client",client);
                return conRes;
            }).catch(async (conRes) => {
                //console.log('retry catch ', retry);
                console.log(conRes);

                if (conRes.code != 220) {
                    if (client.closed && retry <= 3) {
                        retry++;
                        console.log("retry")
                        return await connectFTP(retry)
                    }
                    else {
                        return client;
                    }
                }
                else
                    return conRes;
            });
        }
        catch (err) {
            console.log('err client ', err);
            return err;
        }
    };
    connectFTP();

1 Ответ

0 голосов
/ 23 июня 2020

Согласно моему тесту, мы можем использовать следующий код для загрузки файлов с FTP-сервера, а затем загрузить файл в Azure blob

var stream = require('stream');
var  ftp = require("basic-ftp");
var process =require('process')
const { BlobServiceClient} = require('@azure/storage-blob');
var uuid = require("uuid");

example()
async function example() {

    const client = await new ftp.Client()
    client.ftp.verbose = true
    try {
        await client.access({
            host: "13.76.32.70",
            user: "testqw",
            password: "Password0123!"
        })
        const blobServiceClient = await BlobServiceClient.fromConnectionString("*");
        var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
        const blockBlobClient = await containerClient.getBlockBlobClient("sample.csv");
        const blockList=[]
        const lowCaseTransform = await new stream.Transform();
        lowCaseTransform._transform= async function (fileChunk, encoding, callback){
            var id = uuid.v4();
            const blockId=Buffer.from(id).toString("base64")
            blockList.push(blockId)           
            await blockBlobClient.stageBlock(blockId,fileChunk,fileChunk.length);
            callback();

        } 
        lowCaseTransform.pipe(process.stdout)
        await  client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")

        const result= await blockBlobClient.commitBlockList(blockList);
        console.log(result._response.status)
       
        
    }
    catch(err) {
        console.log(err)
    }
     client.close()
}

enter image description here enter image description here


Update

var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
        const blockBlobClient = await containerClient.getBlockBlobClient("sample1.csv");
        const blockList=[]
        const lowCaseTransform = await new stream.Transform();
        const pass= await new stream.PassThrough()
        
        const r=[]
        lowCaseTransform._transform= async function (fileChunk, encoding, callback){
            //lowCaseTransform.push(fileChunk)
            r.push(fileChunk)
            callback();

          } 
        lowCaseTransform.pipe(process.stdout)
        await  client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")
       var t = Buffer.concat(r)

        const result= await blockBlobClient.upload(t,t.byteLength);
        console.log(result._response.status)

введите описание изображения здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...