Я пишу SinkConnector в Kafka Connect и решаю проблему. Этот разъем имеет такую конфигурацию:
{
"connector.class" : "a.b.ExampleFileSinkConnector",
"tasks.max" : '1',
"topics" : "mytopic",
"maxFileSize" : "50"
}
Я определяю конфигурацию коннектора следующим образом:
@Override public ConfigDef config()
{
ConfigDef result = new ConfigDef();
result.define("maxFileSize", Type.STRING, "10", Importance.HIGH, "size of file");
return result;
}
В коннекторе запускаю задачи так:
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<Map<String,String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("connectorName", connectorName);
taskConfig.put("taskNumber", Integer.toString(i));
taskConfig.put("maxFileSize", maxFileSize);
result.add(taskConfig);
}
return result;
}
и все идет хорошо.
Однако при запуске Task (в taskConfigs ()), если я добавлю это:
taskConfig.put("epoch", "123");
это нарушает всю инфраструктуру: все соединители останавливаются и перезапускаются в бесконечном цикле.
В файле журнала подключений нет исключений или ошибок, которые могут помочь.
Единственный способ заставить его работать - добавить «эпоху» в конфиг коннектора, что я не хочу делать, поскольку это внутренний параметр, который коннектор должен отправить в задачу. Он не предназначен для показа пользователям соединителя.
Еще один момент, который я заметил, заключается в том, что невозможно обновить значение какого-либо параметра конфигурации соединителя, кроме как установить его в значение по умолчанию. Изменение параметра и отправка его в задачу приводит к тому же поведению.
Буду очень признателен за помощь в этом вопросе.
РЕДАКТИРОВАТЬ: вот код SinkTask :: start ()
@Override public void start(Map<String, String> taskConfig) {
try {
connectorName = taskConfig.get("connectorName");
log.info("{} -- Task.start()", connectorName);
fileNamePattern = taskConfig.get("fileNamePattern");
rootDir = taskConfig.get("rootDir");
fileExtension = taskConfig.get("fileExtension");
maxFileSize = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxFileSize"));
maxTimeMinutes = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxTimeMinutes"));
maxNumRecords = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxNumRecords"));
taskNumber = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("taskNumber"));
epochStart = SimpleFileSinkConnector.parseLongConfig(taskConfig.get("epochStart"));
log.info("{} -- fileNamePattern: {}, rootDir: {}, fileExtension: {}, maxFileSize: {}, maxTimeMinutes: {}, maxNumRecords: {}, taskNumber: {}, epochStart : {}",
connectorName, fileNamePattern, rootDir, fileExtension, maxFileSize, maxTimeMinutes, maxNumRecords, taskNumber, epochStart);
if (taskNumber == 0) {
checkTempFilesForPromotion();
}
computeInitialFilename();
log.info("{} -- Task.start() END", connectorName);
} catch (Exception e) {
log.info("{} -- Task.start() EXCEPTION : {}", connectorName, e.getLocalizedMessage());
}
}