Я создаю заглушку, которая подключается к серверу, который передает данные с определенным интервалом, а затем загружаю его в TSDB.Я реализовал пакетную обработку, чтобы оптимизировать загрузку, но если объем данных, передаваемых за один интервал, не совпадает с размером пакета, некоторые данные не будут загружены до следующего интервала, что мне не нужно.Есть ли способ на заглушке gRPC, чтобы проверить, является ли поток пустым?
class DialInClient(object):
def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
self._host = host
self._port = port
self._timeout = float(timeout)
self._channel = None
self._cisco_ems_stub = None
self._connected = False
self._metadata = [('username', user), ('password', password)]
def subscribe(self, sub_id):
sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
for segment in stream:
yield segment
def connect(self):
self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
try:
grpc.channel_ready_future(self._channel).result(timeout=10)
self._connected = True
except grpc.FutureTimeoutError as e:
raise DeviceFailedToConnect from e
else:
self._cisco_ems_stub = gRPCConfigOperStub(self._channel)
Если я установлю малое время ожидания, весь канал отключится, я хочу добавить какое-то время ожидания в цикле forпотоковая передача, чтобы увидеть, не получу ли я еще один сегмент за 1 секунду, выдаст None
, чтобы сообщить моей другой части, что это конец, и загрузить без полного размера пакета.