Я разработал приложение с помощью gRPC servicer.Суть моего приложения:
У сервера gRPC (класс DexFxServicer в приведенном ниже коде) есть метод Transmit, который вызывается клиентом gRPC снаружи.
Метод передачи создает несколько каналов и заглушек для разных хостов из hostList.
Далее приложение создает пул процессов и запускает его.
Каждый дочерний процесс вызывает метод gRPC SendHostListAndGetMetrics для своей собственной заглушки и получает итератор ответа.
Этот код работает хорошо, приложение вызывает метод Transmit и получает все необходимые результаты из пула процессов.Но я заметил, что когда внешний клиент gRPC несколько раз вызывает метод Transmit, этот код не закрывал некоторые из его дочерних процессов.И это приводит к созданию дополнительных незакрытых процессов, как показывает htop.Когда я пытаюсь закрыть каналы gRPC методом channel.close (), дополнительные процессы создаются более интенсивно.
Python 2.7.12 grpcio == 1.16.1 grpcio-tools == 1.16.1 Ubuntu 16.04.6 LTS 4.4.0-143-generic
from concurrent import futures
import sleep
import grpc
import sys
import cascade_pb2
import cascade_pb2_grpc
import metrics_pb2
import metrics_pb2_grpc
from multiprocessing import Pool
class DexFxServicer(cascade_pb2_grpc.DexFxServicer):
def __init__(self, args):
self.args = args
def Transmit(self, request, context):
entrypoint = request.sender.host_address # entrypoint is a string
hostList = [] # hostList is a list of strings
for rec in request.sender.receiver:
hostList.append(rec.host_address)
channels = {}
stubs = {}
for host in hostList:
try:
channels[host] = grpc.insecure_channel('%s:%d' % (host, self.args.cascadePort))
except Exception as e:
print(e)
sys.exit(0)
else:
stubs[host] = metrics_pb2_grpc.MetricsStub(channels[host])
def collect_metrics(host):
mtrx = []
hosts = (metrics_pb2.Host(hostname = i) for i in hostList + [entrypoint])
for i in stubs[host].SendHostListAndGetMetrics(hosts):
mtrx.append(i.mtrx)
return mtrx
pool = Pool(len(hostList))
results = pool.map(collect_metrics, hostList)
pool.close()
pool.terminate()
pool.join()
# Return the iterator of the results
Я ожидаю увидеть код, который не создает дополнительных незакрытых процессов.Пожалуйста, предложите мне, что делать в этом случае.