Создание Kapacitor UDF, который хочет STREAM и предоставляет BATCH (Python) - PullRequest
0 голосов
/ 05 июля 2018

У меня проблемы с созданием UDF, которому нужен STREAM и который предоставляет BATCH.

Таким образом:

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.STREAM
    response.info.provides = udf_pb2.BATCH
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    return response

Есть ли кто-нибудь с примером кода ?? Я искал по сети (foruns, документация), но все примеры для BATCH-BACH, STREAM-STREAM или BATCH-STREAM.

Я видел в примерах, что при написании ответа для Kapacitor в методе «end_batch (self, end_req)» необходимо как бы «связать», что BATCH закончился, в примере это было сделано таким образом :

def end_batch(self, end_req):
    # Send begin batch with count of outliers
    self._begin_response.begin.size = len(self._batch)
    self._agent.write_response(self._begin_response)

    response = udf_pb2.Response()


                                  ...    


    # Send an identical end batch back to Kapacitor
        # HERE
    response.end.CopyFrom(end_req)
    self._agent.write_response(response)

Чтобы отправить BATCH, я должен отправить его из метода «point (self, point)», но не могу получить доступ к объекту end_req и не знаю, как его создать.

Заранее спасибо! Пока, пока!

1 Ответ

0 голосов
/ 28 августа 2018

Надеюсь, что это все еще актуально, я бы сделал UDF STREAM-STREAM и направил его в оконный узел. Вы можете сохранить копию окна данных, как в примере с их скользящим средним, и выполнить любой пакетный анализ по этому вопросу. Если бы вы выяснили, как написать UDF STREAM-BATCH, я бы с удовольствием посмотрел его, хотя и менее безобразно, чем мой ответ.

Редактировать

jdv определенно был прав, мой последний ответ был скорее комментарием. Вот код STREAM-BATCH UDF в python, он просто повторяет данные, поступившие в поток в пакете. Он все еще немного сломан, потому что не может сериализовать класс точек в методе снимка обработчика. Таким образом, всякий раз, когда ему нужно сделать снимок, он падает, его можно решить, используя другой метод сериализации, такой как выборка, или записав кодировщик / декодер JSON для точки. Я найду время, чтобы исправить это, но моя рабочая неделя почти закончена. Главное, что вам нужно сделать, чтобы создать UDF STREAM-BATCH, - это создать сообщения начала и конца пакета, что делается в методах createEndBatch и createStartBatch соответственно.

Редактировать 2

Исправлена ​​сериализация с использованием комбинации метода protobufs и json.

...