Spring Boot GRP C: ServerIntereceptor для чтения данных в запросе и установки их в ответе - PullRequest
0 голосов
/ 09 марта 2020

Существует поле под названием «метаданные» (не путать с метаданными GRP C), которое присутствует в каждом прото-запросе, который поступает в службу GRP C:

message MyRequest {
  RequestResponseMetadata metadata = 1;
  ...
}

И это же поле также присутствует во всех ответах:

message MyResponse {
  RequestResponseMetadata metadata = 1;
  ...
}

Я пытаюсь написать ServerInterceptor (или что-то еще, если это работает), чтобы прочитать поле «метаданные» из запроса, оставьте его где-то, а затем установите его в ответе после завершения обработки запроса.

Попытка 1: ThreadLocal

public class ServerInterceptor implements io.grpc.ServerInterceptor {

  private ThreadLocal<RequestResponseMetadata> metadataThreadLocal = new ThreadLocal<>();

  @Override
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT, RespT> call,
      final Metadata requestHeaders,
      ServerCallHandler<ReqT, RespT> next) {
    return new SimpleForwardingServerCallListener<ReqT>(
        next.startCall(
            new SimpleForwardingServerCall<ReqT, RespT>(call) {
              @Override
              public void sendMessage(RespT message) {
                super.sendMessage(
                    (RespT)
                        MetadataUtils.setMetadata(
                            (GeneratedMessageV3) message, metadataThreadLocal.get()));
                metadataThreadLocal.remove();
              }
            },
            requestHeaders)) {
      @Override
      public void onMessage(ReqT request) {
        // todo nava see if ReqT can extend GenericV3Message
        var metadata = MetadataUtils.getMetadata((GeneratedMessageV3) request);
        metadataThreadLocal.set(metadata);
        super.onMessage(request);
      }
    };
  }
}

Я пытался использовать ThreadLocal, чтобы позже понимают , что sendMessage и onMessage не обязательно должны быть в одном потоке.

Попытка 2: GRP C Контекст

public class ServerInterceptor implements io.grpc.ServerInterceptor {

  public static final Context.Key<RequestResponseMetadata> METADATA_KEY = Context.key("metadata");

  @Override
  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT, RespT> call,
      final Metadata requestHeaders,
      ServerCallHandler<ReqT, RespT> next) {
    return new SimpleForwardingServerCallListener<ReqT>(
        next.startCall(
            new SimpleForwardingServerCall<ReqT, RespT>(call) {
              @Override
              public void sendMessage(RespT message) {
                super.sendMessage(
                    (RespT)
                        MetadataUtils.setMetadata(
                            (GeneratedMessageV3) message, METADATA_KEY.get()));
              }
            },
            requestHeaders)) {
      @Override
      public void onMessage(ReqT request) {
        var metadata = MetadataUtils.getMetadata((GeneratedMessageV3) request);
        var newContext = Context.current().withValue(METADATA_KEY, metadata);
        oldContext = newContext.attach();
        super.onMessage(request);
      }
    };
  }
}

Я планирую отключить контекст в onComplete(), но прежде чем он достигнет самого себя, METADATA_KEY.get() в sendMessage возвращает null, в то время как я ожидал, что он вернет данные.

Даже перед нажатием на sendMessage(), я получаю это в консоли, показывая, что я делаю что-то не так:

3289640 [grpc-default-executor-0] ERROR i.g.ThreadLocalContextStorage - Context was not attached when detaching
java.lang.Throwable: null
    at io.grpc.ThreadLocalContextStorage.detach(ThreadLocalContextStorage.java:48)
    at io.grpc.Context.detach(Context.java:421)
    at io.grpc.Context$CancellableContext.detach(Context.java:761)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:39)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Как читать данные при получении запроса, хранить его где-нибудь и использовать при отправке ответа?

1 Ответ

0 голосов
/ 12 марта 2020

Вы можете использовать Метаданные для передачи значений из запроса в ответ:

public class MetadataServerInterceptor implements ServerInterceptor {

    public static final Metadata.Key<byte[]> METADATA_KEY = Metadata.Key.of("metadata-bin", Metadata.BINARY_BYTE_MARSHALLER);

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        var serverCall = new ForwardingServerCall.SimpleForwardingServerCall<>(call) {
            @Override
            public void sendMessage(RespT message) {
                byte[] metadata = headers.get(METADATA_KEY);
                message = (RespT) MetadataUtils.setMetadata((GeneratedMessageV3) message, metadata);
                super.sendMessage(message);
            }
        };
        ServerCall.Listener<ReqT> listenerWithContext = Contexts.interceptCall(Context.current(), serverCall, headers, next);
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(listenerWithContext) {
            @Override
            public void onMessage(ReqT message) {
                byte[] metadata = MetadataUtils.getMetadata((GeneratedMessageV3) message);
                headers.put(METADATA_KEY, metadata);
                super.onMessage(message);
            }
        };
    }
}

Примечание: Поскольку невозможно поместить экземпляр RequestResponseMetadata в метаданных (по крайней мере, без реализации собственного маршаллера) вы можете сохранить их там как байтовый массив. Вы можете использовать toByteArray() на вашем RequestResponseMetadata объекте, чтобы получить byte[] и RequestResponseMetadata.#parseFrom(byte[]), чтобы получить объект от byte[].

...