Я пытаюсь создать пользовательский соединитель, следуя шаблону, используемому в примерах
https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
https://github.com/confluentinc/kafka-connect-insert-uuid/blob/master/src/main/java/com/github/cjmatta/kafka/connect/smt/InsertUuid.java
Я переопределил метод apply, чтобы добавить небольшую строку в полезную нагрузку
public R apply(R record) {
log.info("Transformation apply has started...");
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, newValue);
}
Это никак не влияет. Выход не изменился. Я также не вижу информационных сообщений журнала в сгенерированном журнале.
Однако, если я переопределил метод newRecord класса Value, я смогу увидеть, что сообщение обновляется. Также я вижу информационные сообщения, которые регистрируются.
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :");
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp());
}
Это приводит к обновлению выходного сообщения, как и ожидалось. Обратите внимание, что полезная нагрузка в сообщении является строковой полезной нагрузкой и не имеет какой-либо конкретной структуры.
Я думал, что эта манипуляция полезной нагрузкой может происходить и в методе apply (). Также я не вижу, чтобы этот метод apply () вызывался в прогоне преобразования. На полезную нагрузку это не влияет, и сообщения не регистрируются.
Я что-то здесь упускаю. Метод apply () вызывается или используется неправильно. Любое руководство приветствуется.
Примечание. Также работает переопределение метода newRecord класса ключей.
Полный исходный код преобразования приведен ниже
package com.xxxxx.yyyyy.kafka.connect.transform;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public abstract class SampleTransform<R extends ConnectRecord<R>> implements Transformation<R> {
private static Logger log = LoggerFactory.getLogger(SampleTransform.class);
@Override
public void configure(Map<String, ?> map) {
}
@Override
public R apply(R record) {
log.info("Transformation apply has started..."); //This does not work
String newValue = record.value().toString() + " this has been transformed" ;
log.info("Message changed to : " + newValue);
return newRecord(record, null, "Message is fully changed"); //Output does not change
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Key being changed to : " + updatedValue.toString()); //This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
log.info("Message Value being changed to :"); // This is logged
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp()); //This works
}
}
}