Я пытаюсь добавить новый пользовательский Kafka Converter , который является модификацией JsonConverterConfig в connect-json . Я пытаюсь добавить новое конвертируемое свойство, скажем "schemas.modifications.enable" в конвертере, расширяющем JsonConverterConfig. Но Kafka Connect не может найти информацию о конвертере.
Мой код сниппет:
public class ModifiedJsonConfig extends JsonConverterConfig {
public static final String SCHEMAS_MODIFY_CONFIG = "schemas.modifications.enable";
public static final boolean SCHEMAS_MODIFY_CONFIG_DEFAULT = true;
private static final String SCHEMAS_MODIFY_CONFIG_DOC = "The maximum number of schemas that can be cached in this converter instance.";
private static final String SCHEMAS_MODIFY_CONFIG_DISPLAY = "Schema Cache Size";
private final static ConfigDef CONFIG;
static {
String group = "Schemas-modification";
int orderInGroup = 0;
CONFIG = ConverterConfig.newConfigDef();
CONFIG.define(SCHEMAS_MODIFY_CONFIG, Type.BOOLEAN, SCHEMAS_MODIFY_CONFIG_DEFAULT, Importance.HIGH, SCHEMAS_MODIFY_CONFIG_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMAS_MODIFY_CONFIG_DISPLAY);
}
public static ConfigDef configDef() {
return CONFIG;
}
public ModifiedJsonConfig(Map<String, ?> props) {
super(props);
}
public boolean schemasModified() {
return getBoolean(SCHEMAS_MODIFY_CONFIG);
}
}
Но я получаю ошибку здесь:
ОШИБКА Остановка из-за ошибки (org.apache.kafka.connect.cli.ConnectDistributed: 83)
org.apache.kafka.common.config.ConfigException: неизвестная конфигурация 'schemas.modifications.enable'
Но я определил эту конфигурацию. Было бы очень полезно, если бы вы могли помочь мне установить здесь собственное свойство конвертера.
Заранее спасибо.