Измените конфигурацию соединителя в KafkaConnect перед отправкой в ​​задачу - PullRequest
0 голосов
/ 25 января 2019

Я пишу SinkConnector в Kafka Connect и решаю проблему. Этот разъем имеет такую ​​конфигурацию:

{
    "connector.class" : "a.b.ExampleFileSinkConnector",
    "tasks.max" : '1',
    "topics" : "mytopic",
    "maxFileSize" : "50"
}

Я определяю конфигурацию коннектора следующим образом:

@Override public ConfigDef config()
  {
    ConfigDef result = new ConfigDef();
    result.define("maxFileSize", Type.STRING, "10", Importance.HIGH, "size of file");
    return result;
  }

В коннекторе запускаю задачи так:

@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
  List<Map<String, String>> result = new ArrayList<Map<String,String>>();
  for (int i = 0; i < maxTasks; i++) {
    Map<String, String> taskConfig = new HashMap<>();
    taskConfig.put("connectorName",   connectorName);
    taskConfig.put("taskNumber",      Integer.toString(i));
    taskConfig.put("maxFileSize",     maxFileSize);
    result.add(taskConfig);
  }
  return result;
}

и все идет хорошо.

Однако при запуске Task (в taskConfigs ()), если я добавлю это:

taskConfig.put("epoch", "123");

это нарушает всю инфраструктуру: все соединители останавливаются и перезапускаются в бесконечном цикле.

В файле журнала подключений нет исключений или ошибок, которые могут помочь.

Единственный способ заставить его работать - добавить «эпоху» в конфиг коннектора, что я не хочу делать, поскольку это внутренний параметр, который коннектор должен отправить в задачу. Он не предназначен для показа пользователям соединителя.

Еще один момент, который я заметил, заключается в том, что невозможно обновить значение какого-либо параметра конфигурации соединителя, кроме как установить его в значение по умолчанию. Изменение параметра и отправка его в задачу приводит к тому же поведению.

Буду очень признателен за помощь в этом вопросе.

РЕДАКТИРОВАТЬ: вот код SinkTask :: start ()

@Override public void start(Map<String, String> taskConfig) {
  try {
    connectorName   = taskConfig.get("connectorName");
    log.info("{} -- Task.start()", connectorName);
    fileNamePattern = taskConfig.get("fileNamePattern");
    rootDir         = taskConfig.get("rootDir");
    fileExtension   = taskConfig.get("fileExtension");
    maxFileSize     = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxFileSize"));
    maxTimeMinutes  = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxTimeMinutes"));
    maxNumRecords   = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxNumRecords"));
    taskNumber      = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("taskNumber"));
    epochStart      = SimpleFileSinkConnector.parseLongConfig(taskConfig.get("epochStart"));
    log.info("{} -- fileNamePattern: {}, rootDir: {}, fileExtension: {}, maxFileSize: {}, maxTimeMinutes: {}, maxNumRecords: {}, taskNumber: {}, epochStart : {}",
            connectorName, fileNamePattern, rootDir, fileExtension, maxFileSize, maxTimeMinutes, maxNumRecords, taskNumber, epochStart);
    if (taskNumber == 0) {
      checkTempFilesForPromotion();
    }
    computeInitialFilename();
    log.info("{} -- Task.start() END", connectorName);
  } catch (Exception e) {
    log.info("{} -- Task.start() EXCEPTION : {}", connectorName, e.getLocalizedMessage());
  }
}

1 Ответ

0 голосов
/ 29 января 2019

Мы нашли причину проблемы. Платформа Kafka Connect на самом деле ведет себя так, как задумано - проблема связана с тем, как мы пытаемся использовать инфраструктуру конфигурации taskConfigs.

Проблема

В нашем проекте FileSinkConnector устанавливает эпоху в своем методе жизненного цикла start (), и эта эпоха передается его задачам посредством метода жизненного цикла taskConfigs (). Таким образом, каждый раз, когда запускается метод жизненного цикла Connector start (), для задач генерируется различная конфигурация, что является проблемой.

Создание разных конфигураций каждый раз - нет-нет. Оказывается, Connect Framework обнаруживает различия в конфигурации и перезапускает / восстанавливает баланс при обнаружении - остановке и перезапуске соединителя / задачи. Этот перезапуск вызовет методы коннектора stop () и start () ... что (конечно) вызовет еще одно изменение конфигурации (из-за новой эпохи), и порочный цикл включен!

Это была интересная и неожиданная проблема ... из-за поведения в Connect, которое мы не ценили. Это первый случай, когда мы пытались создать конфигурацию задачи, которая не была простой функцией конфигурации соединителя.

Обратите внимание, что такое поведение в Connect является преднамеренным и решает реальные проблемы с динамически изменяющейся конфигурацией - например, коннектор JDBC Sink Connector, который самопроизвольно обновляет свою конфигурацию при обнаружении новой таблицы базы данных, которую он хочет поглотить.

Спасибо тем, кто нам помог!

...