Может ли кто-нибудь предоставить мне простой пример настройки потокового файла в пользовательском процессоре Nifi, чтобы полезная нагрузка могла быть отправлена через процессор PublishKafka?
У меня есть устаревший протокол обмена сообщениями, для которого я написал собственный процессор. Довольно простая структура, просто MessageID (String) и MessageBody (byte []). Мой пользовательский процессор обрабатывает ввод с полученными сообщениями в порядке. Сейчас я пытаюсь поместить эти данные в потоковый файл, чтобы их можно было отправить на процессор publishKafka, но у меня возникли проблемы с поиском в Интернете каких-либо ресурсов, как это сделать. Вот мой текущий фрагмент кода соответствующей части:
try {
this.getLogger().info("[INFO - ListenMW] - Message Received: " +
data.getMsgID().toString() + " Size: " +
data.getMsgData().length);
this.currentSession.adjustCounter("MW Counter", 1, true);
// Setup the flowfile to transfer
FlowFile flowfile = this.currentSession.create();
flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));
this.currentSession.transfer(flowfile, SUCCESS);
}catch(Exception e) {
this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
this.currentSession.adjustCounter("MW Failure", 1, true);
}
Я не смог определить, какие атрибуты использовать для msgID и msgData, поэтому я создал свой собственный. Я видел один пост, в котором кто-то порекомендовал создать вашу собственную json структуру и отправить ее через вашу полезную нагрузку, но опять же, какой атрибут вы бы отправили через это, чтобы он правильно отображался в сообщении kafka? Я довольно новичок в Kafka и до сих пор экспериментировал только с рудиментарными контрольными примерами, так что прости мое невежество за любые неправильные предположения.
Спасибо за любые рекомендации! Я использую Kafka2.0.1 и процессор PublishKafka_2.0.