сопровождающие юнит-тесты служат довольно хорошим справочным материалом.
def start_server(self, non_blocking=False, thread_pool=None):
self._thread_pool = thread_pool
self._servicer = health.HealthServicer(
experimental_non_blocking=non_blocking,
experimental_thread_pool=thread_pool)
self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
self._servicer.set(_SERVING_SERVICE,
health_pb2.HealthCheckResponse.SERVING)
self._servicer.set(_UNKNOWN_SERVICE,
health_pb2.HealthCheckResponse.UNKNOWN)
self._servicer.set(_NOT_SERVING_SERVICE,
health_pb2.HealthCheckResponse.NOT_SERVING)
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
health_pb2_grpc.add_HealthServicer_to_server(
self._servicer, self._server)
self._server.start()
Сохраните ссылку на сервис-сервер работоспособности в главном классе сервис-серверов для вашего сервера приложений. Затем вызовите метод set()
для него в соответствующее время, например, когда запуск завершен или когда он переходит в состояние, в котором он не может выполнять запросы к серверу. Однако обратите внимание, что здесь используются некоторые экспериментальные функции, чтобы гарантировать, что подключенный сервисный центр работоспособности не приведет к истощению запросов на уровне приложений.