Создание собственного процессора в Nifi и отправка на PublishKafka - PullRequest
1 голос
/ 30 января 2020

Может ли кто-нибудь предоставить мне простой пример настройки потокового файла в пользовательском процессоре Nifi, чтобы полезная нагрузка могла быть отправлена ​​через процессор PublishKafka?

У меня есть устаревший протокол обмена сообщениями, для которого я написал собственный процессор. Довольно простая структура, просто MessageID (String) и MessageBody (byte []). Мой пользовательский процессор обрабатывает ввод с полученными сообщениями в порядке. Сейчас я пытаюсь поместить эти данные в потоковый файл, чтобы их можно было отправить на процессор publishKafka, но у меня возникли проблемы с поиском в Интернете каких-либо ресурсов, как это сделать. Вот мой текущий фрагмент кода соответствующей части:

    try {
         this.getLogger().info("[INFO - ListenMW] - Message Received: " +
                            data.getMsgID().toString() + " Size: " +
                            data.getMsgData().length);
         this.currentSession.adjustCounter("MW Counter", 1, true);

         // Setup the flowfile to transfer
         FlowFile flowfile = this.currentSession.create();
         flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
         flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));

         this.currentSession.transfer(flowfile, SUCCESS);


     }catch(Exception e) {
          this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
          this.currentSession.adjustCounter("MW Failure", 1, true);
     }

Я не смог определить, какие атрибуты использовать для msgID и msgData, поэтому я создал свой собственный. Я видел один пост, в котором кто-то порекомендовал создать вашу собственную json структуру и отправить ее через вашу полезную нагрузку, но опять же, какой атрибут вы бы отправили через это, чтобы он правильно отображался в сообщении kafka? Я довольно новичок в Kafka и до сих пор экспериментировал только с рудиментарными контрольными примерами, так что прости мое невежество за любые неправильные предположения.

Спасибо за любые рекомендации! Я использую Kafka2.0.1 и процессор PublishKafka_2.0.

1 Ответ

2 голосов
/ 01 февраля 2020

Исходя из того, чем вы поделились, похоже, что основная причина того, что вы ничего не публикуете в Kafka, заключается в том, что вы фактически ничего не записываете в содержимое потокового файла. Для справки: - это копия javadocs для NiFi (также, - это документация процессора ). Что вы должны делать, это примерно так:

flowFile = session.write(flowFile, outStream -> {
  outStream.write("some string here".getBytes());
});

Я использую PublishKafkaRecord, но процессоры PublishKafka концептуально очень похожи. Вы можете установить ключ для сообщения так, как вы это делаете там, но вам нужно установить значение, записав его в тело файла потока.

Не зная здесь вашего более широкого варианта использования, похоже, что вы можете сделать то, что вам нужно сделать с ExecuteScript. См. this как отправную точку для ExecuteScript с несколькими ссылками на язык сценариев.

Если вам нужна дополнительная помощь, у нас есть несколько вариантов здесь для вас.

...