gRPC + Тема локальной проблемы - PullRequest
0 голосов
/ 26 ноября 2018

Я собираю сервер grpc с python и пытаюсь обработать некоторое локальное хранилище потоков с помощью werkzeug Local и LocalProxy, аналогично тому, что делает колба.

Проблема, с которой я сталкиваюсь, заключается в том, что когда я сохраняю некоторыеданные в локальном от сервера-перехватчика, а затем попытаться получить их из обслуживающего устройства, локальный пустой.Реальная проблема заключается в том, что по какой-то причине перехватчик работает в другом гринлете, чем обслуживающий сервер, поэтому обмен данными между запросами невозможен, так как werkzeug.local. storage заканчивается разными ключами для данныхпредполагается, что он принадлежит одному и тому же запросу.

То же самое происходит с использованием библиотеки потоков Python, похоже, что перехватчики запускаются из основного потока или потока, отличного от обслуживающих.Есть ли обходной путь для этого?Я ожидал, что перехватчики будут работать в том же потоке, что позволяет делать такие вещи.

# Define a global somewhere
from werkzeug.local import Local
local = Local()

# from an interceptor save something
local.message = "test msg"

# from the service access it
local.service_var = "test"
print local.message  # this throw a AttributeError

# print the content of local
print local.__storage__  # we have 2 entries in the storage, 2 different greenlets, but we are in the same request.

1 Ответ

0 голосов
/ 29 ноября 2018

перехватчик действительно запускается в обслуживающем потоке, который отличается от потока обработки.Обслуживающий поток отвечает за обслуживание обслуживающих и перехватывает обработчики обслуживающих устройств.После того, как обработчик метода serverer будет возвращен перехватчиками, обслуживающий поток отправит его в thread_pool at _server.py # L525 :

# Take unary unary call as an example.
# The method_handler is the returned object from interceptor.
def _handle_unary_unary(rpc_event, state, method_handler, thread_pool):
    unary_request = _unary_request(rpc_event, state,
                                   method_handler.request_deserializer)
    return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
                              method_handler.unary_unary, unary_request,
                              method_handler.request_deserializer,
                              method_handler.response_serializer)

Что касается обходного пути, я могутолько представьте, что передаете экземпляр хранилища как перехватчику, так и серверу во время инициализации.После этого хранилище можно использовать как переменную-член.

class StorageServerInterceptor(grpc.ServerInterceptor):

    def __init__(self, storage):
        self._storage = storage

    def intercept_service(self, continuation, handler_call_details):
        key = ...
        value = ...
        self._storage.set(key, value)
        ...
        return continuation(handler_call_details)

class Storage(...StorageServicer):

    def __init__(self, storage):
        self._storage = storage

    ...Servicer Handlers...
...