Объект gRPC создает дополнительные процессы, которые не закрываются - PullRequest
0 голосов
/ 25 сентября 2019

Я разработал приложение с помощью gRPC servicer.Суть моего приложения:

  1. У сервера gRPC (класс DexFxServicer в приведенном ниже коде) есть метод Transmit, который вызывается клиентом gRPC снаружи.

  2. Метод передачи создает несколько каналов и заглушек для разных хостов из hostList.

  3. Далее приложение создает пул процессов и запускает его.

  4. Каждый дочерний процесс вызывает метод gRPC SendHostListAndGetMetrics для своей собственной заглушки и получает итератор ответа.

Этот код работает хорошо, приложение вызывает метод Transmit и получает все необходимые результаты из пула процессов.Но я заметил, что когда внешний клиент gRPC несколько раз вызывает метод Transmit, этот код не закрывал некоторые из его дочерних процессов.И это приводит к созданию дополнительных незакрытых процессов, как показывает htop.Когда я пытаюсь закрыть каналы gRPC методом channel.close (), дополнительные процессы создаются более интенсивно.

Python 2.7.12 grpcio == 1.16.1 grpcio-tools == 1.16.1 Ubuntu 16.04.6 LTS 4.4.0-143-generic

from concurrent import futures
import sleep
import grpc
import sys
import cascade_pb2
import cascade_pb2_grpc
import metrics_pb2
import metrics_pb2_grpc
from multiprocessing import Pool

class DexFxServicer(cascade_pb2_grpc.DexFxServicer):

def __init__(self, args):
    self.args = args

def Transmit(self, request, context):
    entrypoint = request.sender.host_address # entrypoint is a string
    hostList = []   # hostList is a list of strings
    for rec in request.sender.receiver:
        hostList.append(rec.host_address)

    channels = {}
    stubs = {}

    for host in hostList:
        try:
            channels[host] = grpc.insecure_channel('%s:%d' % (host, self.args.cascadePort))
        except Exception as e:
            print(e)
            sys.exit(0)
        else:
            stubs[host] = metrics_pb2_grpc.MetricsStub(channels[host])

    def collect_metrics(host):
        mtrx = []
        hosts = (metrics_pb2.Host(hostname = i) for i in hostList + [entrypoint])
        for i in stubs[host].SendHostListAndGetMetrics(hosts):
            mtrx.append(i.mtrx)
        return mtrx

    pool = Pool(len(hostList))
    results = pool.map(collect_metrics, hostList)
    pool.close()
    pool.terminate()
    pool.join()

   # Return the iterator of the results

Я ожидаю увидеть код, который не создает дополнительных незакрытых процессов.Пожалуйста, предложите мне, что делать в этом случае.

1 Ответ

0 голосов
/ 27 сентября 2019

Проблема была решена путем обновления версии grpcio до 1.23.0. выпуск gRPC

...