Нужно объяснение передачи двоичных данных с помощью Thrift RPC - PullRequest
3 голосов
/ 11 августа 2011

Допустим, я определил следующую услугу Thrift

service FileResource {      
binary get_file(1:string file_name)
}

Вот созданная реализация, которую я не могу понять

public ByteBuffer recv_get_file() throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
  if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
    org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
    iprot_.readMessageEnd();
    throw x;
  }
  if (msg.seqid != seqid_) {
    throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "get_file failed: out of sequence response");
  }
  get_file_result result = new get_file_result();
  result.read(iprot_);
  iprot_.readMessageEnd();
  if (result.isSetSuccess()) {
    return result.success;
  }
  throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "get_file failed: unknown result");
}

Как работает строка

 result.read(iprot_);

?Это синхронно или асинхронно?Как это будет работать для больших данных (несколько мегабайт и более)?И что мне нужно, чтобы прочитать эти данные?К сожалению, я не привык работать с java.nio и ByteBuffer.Любые примеры или руководства были бы хорошими.

Ответы [ 2 ]

3 голосов
/ 05 апреля 2013

Я думаю, вы неправильно поняли, для чего Apache Thrift .Если бы это было так сложно, Java NIO было бы проще ...

Как это будет работать для больших данных (несколько мегабайт и более)?

Забота должна заботитьсятранспортировать эти данные для вас.Как производительность?Это будет сильно зависеть от вашего оборудования и качества сети. Thrift имеет довольно хорошую производительность .

А что мне нужно для чтения этих данных?

В вашем клиенте Java Thrift вы можете сделать

TTransport transport;
transport = new TSocket("yourServerHostNameOrIPAddress", serverPort);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
ChunkFileResourceThrift.Client client = new ChunkFileResourceThrift.Client(protocol);
ByteBuffer buffer = client.get_file(yourFileName);
// Do whatever you want with the byte buffer
transport.close();

Является ли он синхронным или асинхронным?

Если вы определили его как oneway в файле .thrift , он асинхронный, в противном случаеэто синхронно.Таким образом, в вашем случае это синхронно.

Необходимость реализации сетевых низкоуровневых деталей полностью превосходит цель использования Thrift.Thrift точно используется, поэтому вы можете забыть об этих деталях.

0 голосов
/ 27 августа 2011

Наконец мне удалось передать файл с сервера на клиент. Я расширил классы Client и Processor, автоматически сгенерированные Thrift. Это дало мне доступ к объекту TProtocol. Что, в свою очередь, позволяет отправлять / получать произвольные потоки данных.
Я уверен, что мое решение очень грубое. Было бы хорошо, если бы кто-то указал мне, как реализовать это в соответствии с архитектурой Thrift. Может ли это быть достигнуто лучше путем реализации пользовательского протокола Thrift?

клиент:

package alehro.droid;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift;
import alehro.tcp.ServerSideError;

class ThriftClientExt extends ChunkFileResourceThrift.Client {
public ThriftClientExt(TProtocol prot) {
    super(prot);

}

public void recv_get_file_ext(String get_file_out_path) throws TException,
        IOException, ServerSideError {
    FileOutputStream fos = new FileOutputStream(get_file_out_path);
    FileChannel channel = fos.getChannel();
    int size = 0;
    // -1 - end of file, -2 exception.
    while ((size = iprot_.readI32()) > 0) {
        Logger.me.v("receiving buffer size=" + size);
        ByteBuffer out = iprot_.readBinary();
        // out.flip();
        channel.write(out);
    }
    if (size == -2) {
        String msg = iprot_.readString();
        Logger.me.e("Server error: " + msg);
        // TODO: report error to user
    }
    channel.close();
    recv_get_file();
}

}

Сервер:

package alehro.tcp;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift.Iface;
import alehro.tcp.ChunkFileResourceThrift.get_file_args;
import alehro.tcp.ChunkFileResourceThrift.get_file_result;

public class ChunkedFileResourceProcessor extends
    ChunkFileResourceThrift.Processor {

public interface IfaceExt extends Iface {
    void get_file_raw(String key, String file_name, TProtocol out)
            throws TException, ServerSideError;
}

final private IfaceExt iface_1;


public ChunkedFileResourceProcessor(IfaceExt iface) {
    super(iface);
    iface_1 = iface;
    // replace generated implementation by my custom one.
    processMap_.put("get_file", new get_file_raw());
}

private class get_file_raw implements ProcessFunction {

    @Override
    public void process(int seqid, TProtocol iprot, TProtocol oprot)
            throws TException {
        get_file_args args = new get_file_args();
        try {
            args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
            iprot.readMessageEnd();
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.PROTOCOL_ERROR,
                    e.getMessage());
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        iprot.readMessageEnd();
        get_file_result result = new get_file_result();
        try {
            iface_1.get_file_raw(args.key, args.file_name, oprot);
        } catch (ServerSideError ouch) {
            result.ouch = ouch;
        } catch (Throwable th) {
            Logger.me.e("Internal error processing get_file_raw");
            Logger.me.e(th.getMessage());
            Logger.me.e(th);
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.INTERNAL_ERROR,
                    "Internal error processing get_file");
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                "get_file", org.apache.thrift.protocol.TMessageType.REPLY,
                seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
    }

}

}

обработчик сервера:

public class ChunkedFileResourceHandler implements
    ChunkedFileResourceProcessor.IfaceExt {
....
@Override
public void get_file(String key, String file_name) throws TException {
    // stub
    throw new TException("Wrong call. Use get_file_raw instead.");
}

@Override
public void get_file_raw(String key, String file_name, final TProtocol out)
        throws ServerSideError, TException {
    // catch all here. mimic original get_file throw politics.
    try {
        Logger.me.v("Begin get_file_raw");
        UserSession se = accessUserSession(key, "get", 0, 0);
        vali(se != null);
        synchronized (se) {
            String fullPath = "";
            Logger.me.i("get file start: " + file_name);
            String userDir = AppConfig.getUserDir(se.info.email);
            fullPath = userDir + file_name;

            final FileInputStream inputFile;
            ByteBuffer buffer = null;
            int bytesRead = -1;
            FileChannel fileChannel = null;

            inputFile = new FileInputStream(fullPath);
            fileChannel = inputFile.getChannel();
            buffer = ByteBuffer.allocate(2048);
            bytesRead = fileChannel.read(buffer);

            // Logger.me.v("start sending file");
            while (bytesRead != -1) {
                buffer.flip();
                int length = buffer.limit() - buffer.position()
                        - buffer.arrayOffset();
                Logger.me.v("sending buffer length=" + length);

                out.writeI32(length); // read it in client
                out.writeBinary(buffer); // read it in client
                buffer.clear();

                bytesRead = fileChannel.read(buffer);

            }
            out.writeI32(-1); // read it in client

            Logger.me.i("get file end.");
        }
    } catch (TException e) {
        throw e;
    } catch (Throwable e) {
        write_get_file_exception(file_name, e, out);
        return;
    }

}

void write_get_file_exception(String file, Throwable e, final TProtocol out)
        throws TException {
    out.writeI32(-2);
    out.writeString("Exception in get_file_raw: file=" + file
            + "description=" + e.getMessage());
    Logger.me.e(e);
    Logger.me.i("get file ended wtih errors: " + e.getMessage());
}
}
...