У меня проблемы с созданием UDF, которому нужен STREAM и который предоставляет BATCH.
Таким образом:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
Есть ли кто-нибудь с примером кода ?? Я искал по сети (foruns, документация), но все примеры для BATCH-BACH, STREAM-STREAM или BATCH-STREAM.
Я видел в примерах, что при написании ответа для Kapacitor в методе «end_batch (self, end_req)» необходимо как бы «связать», что BATCH закончился, в примере это было сделано таким образом :
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
Чтобы отправить BATCH, я должен отправить его из метода «point (self, point)», но не могу получить доступ к объекту end_req и не знаю, как его создать.
Заранее спасибо!
Пока, пока!