Проверка, чтобы видеть, когда поток gRPC пуст или не передает данные - PullRequest
0 голосов
/ 26 октября 2018

Я создаю заглушку, которая подключается к серверу, который передает данные с определенным интервалом, а затем загружаю его в TSDB.Я реализовал пакетную обработку, чтобы оптимизировать загрузку, но если объем данных, передаваемых за один интервал, не совпадает с размером пакета, некоторые данные не будут загружены до следующего интервала, что мне не нужно.Есть ли способ на заглушке gRPC, чтобы проверить, является ли поток пустым?

class DialInClient(object):
    def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
        self._host = host
        self._port = port
        self._timeout = float(timeout)
        self._channel = None
        self._cisco_ems_stub = None
        self._connected = False
        self._metadata = [('username', user), ('password', password)]

    def subscribe(self, sub_id):
        sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
        stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
        for segment in stream:
            yield segment 

    def connect(self):
        self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
        try:
            grpc.channel_ready_future(self._channel).result(timeout=10)
            self._connected = True
        except grpc.FutureTimeoutError as e:
            raise DeviceFailedToConnect from e
        else:
            self._cisco_ems_stub = gRPCConfigOperStub(self._channel)

Если я установлю малое время ожидания, весь канал отключится, я хочу добавить какое-то время ожидания в цикле forпотоковая передача, чтобы увидеть, не получу ли я еще один сегмент за 1 секунду, выдаст None, чтобы сообщить моей другой части, что это конец, и загрузить без полного размера пакета.

1 Ответ

0 голосов
/ 05 ноября 2018

Такой механизм изначально не существует в GRPC, но библиотека threading должна позволять отправлять пакеты до того, как они заполнятся. Я включил измененную версию примера *1003* * GR3 * Python GRPC, чтобы дать вам представление о том, как это можно сделать.

from __future__ import print_function                                                                                                        

import grpc                                                                                                                                  

import helloworld_pb2
import helloworld_pb2_grpc                                                                                                                   

import threading
from six.moves import queue
import time 

# 10 second batches    
BATCH_PERIOD = 10.0

def collect_responses(resp_queue, finished):                                                                                                 
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)                                                                                      
        for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):                           
            resp_queue.put(response)                                                                                                         
    finished.set()                                                                                                                           

def is_batch_end(batch_start):                                                                                                               
    return time.time() - batch_start < BATCH_PERIOD                                                                                          

def get_remaining_time(time_start):                                                                                                          
    return (time_start + BATCH_PERIOD) - time.time()

def batch_responses(resp_queue, finished):
    batch_num = 0
    while True:        
        batch_resps = []
        batch_start = time.time()
        remaining_time = get_remaining_time(batch_start)                                                                                     
        while remaining_time > 0.0 and not finished.is_set():
            try:       
                batch_resps.append(resp_queue.get())                                                                                         
            except queue.Empty:                                                                                                              
                pass                                                                                                                         
            finally:
                remaining_time = get_remaining_time(batch_start)
        print("Batch {} ({}):".format(batch_num + 1, len(batch_resps)))                                                                      
        for resp in batch_resps:                                                                                                             
            print("  '{}'".format(resp.message))
        batch_num += 1

def run():                                                                                                                                   
    resp_queue = queue.Queue()
    finished = threading.Event()                                                                                                             
    client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))                                                  
    client_thread.start()
    batch_responses(resp_queue, finished)                                                                                                    
    client_thread.join()

if __name__ == '__main__':                                                                                                                   
    run() 
...