Чтобы передать атрибуты, вы должны использовать заголовки Kafka, в противном случае невозможно передать атрибуты, так как они не являются частью тела файла потока, который станет телом сообщения в Kafka..
На стороне публикации PublishKafka_2_0 имеет следующее свойство, чтобы указать, какие атрибуты отправлять в качестве заголовков:
static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
На стороне потребления ConsumeKafka_2_0 имеет следующее свойство, чтобы указать, какие поля заголовкадобавить в качестве атрибутов:
static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
.description("A Regular Expression that is matched against all message headers. "
+ "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. "
+ "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by "
+ "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like "
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();