gRP C клиентская потоковая передача rp c ошибка конвейера. (запись после завершения ERROR) - PullRequest
0 голосов
/ 12 января 2020

Я изучаю gRP C программирование сервера-клиента на узле времени выполнения.

Я обнаружил ошибку при потоковой передаче клиента rp c. см. следующую подпись метода rp c.

service RouteGuide{
  rpc DataStreaming(stream File) returns (Stats) {}
}

message Stats{
  string msg=1;
}

message File{
  bytes chk=1;
}

Я хочу загрузить файл с клиента на сервер. поэтому я определил клиентскую потоковую передачу rp c.

Проблема в том, что загрузка файла будет успешной только в первый раз.

, когда я пытаюсь загрузить другой файл, то я получаю ошибку. запись после завершения ERROR.

Я думаю, что не очень хорошо справляюсь с потоком. Может кто-нибудь помочь, почему это происходит? Спасибо!

// server.js
"use strict";

const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");

const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); //    path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: false,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;

const myTransformStream = new stream.Transform({
  objectMode: true,
  transform(data, enc, cb) {
    cb(null, data.chk.toString());
  }
});

function dataStreaming(strm, cb) {
  console.log("server : streaming function");
  stream.pipeline(
    strm,
    myTransformStream,
    fs.createWriteStream("output.txt"),
    err => {
      if (err) {
        console.log(`server side error : ${err}`);
        cb(err);
      } else {
        console.log("server side no error");
        cb(null, "server side finish");
      }
    }
  );
}

function getServer() {
  const server = new grpc.Server();
  server.addService(routeguide.RouteGuide.service, {
    DataStreaming: dataStreaming
  });
  return server;
}

if (require.main === module) {
  const routeServer = getServer();
  routeServer.bind("localhost:3333", grpc.ServerCredentials.createInsecure());
  routeServer.start();
}

=============

// client.js
"use strict";

const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const path = require("path");
const fs = require("fs");
const stream = require("stream");

const PROTO_PATH = path.join(__dirname, "proto", "route.proto"); //    path.resolve("proto", "route.proto")
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: false,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true
});
const routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
const client = new routeguide.RouteGuide(
  "localhost:3333",
  grpc.credentials.createInsecure()
);

const MyTransform = new stream.Transform({
  objectMode: true,
  transform(chk, enc, cb) {
    cb(null, { chk: chk });
  }
});

function runDataStreaming() {
  console.log("inside run-data-streaming()");

  const strm = client.dataStreaming((err, ret) => {
    if (err) {
      console.log("client : file transfer failed.");
      console.log(err);
    } else {
      console.log("client : file transfer succeeded.");
    }
  });

  stream.pipeline(fs.createReadStream("test.txt"), MyTransform, strm, err => {
    if (err) {
      console.log(err.message);
    } else {
      console.log("pipeline succeeded");
    }
  });
}

if (require.main === module) {
  runDataStreaming();
}

1 Ответ

0 голосов
/ 14 января 2020

Я решил свою проблему. были некоторые проблемы. Я объясню это.

  1. Глобальная переменная myTransformStream на сервере. js была проблема. Я переместил код в функцию dataStreaming.

  2. Я также изменил параметры закрытия, уничтожения. Если говорить точнее, я включил опцию autoclose, emitclose в fs.createReadStream(), fs.createWriteStream() и ... включил опцию emitclose, autodestroy в потоке преобразования (myTransformStream и myTransform)

...