Я пытаюсь написать отдельную Java-программу, использующую API-интерфейс kafka-jdbc-connect для потоковой передачи данных из таблицы оракулов в тему kafka.
Используемый API: в настоящее время я пытаюсь использовать Kafka Connectors, точнее, класс JdbcSourceConnector.
Ограничение: используйте Confluent Java API и не делайте этого через CLI или путем выполнения предоставленного сценария оболочки.
Что я сделал: создал экземпляр класса JdbcSourceConnector.java и вызвал метод start (Properties) этого класса, указав объект Properties в качестве параметра. Этот объект свойств имеет свойства подключения к базе данных, свойство белого списка таблиц, префикс темы и т. Д.
После запуска темы я не могу прочитать данные из темы "topic-prefix-tablename". Я не уверен, как передать детали Kafka Broker в JdbcSourceConnector. Вызов метода start () в начальном потоке JdbcSourceConnector, но бездействие.
Существует ли простая кодовая страница учебного руководства по API Java, на которую я могу сослаться, потому что все примеры, которые я вижу, используют сценарии командной строки / оболочки?
Любая помощь приветствуется
Код:
public static void main(String[] args) {
Map<String, String> jdbcConnectorConfig = new HashMap<String, String>();
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "<DATABASE_URL>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "<DATABASE_USER>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "<DATABASE_PASSWORD>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "300000");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG, "10");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.MODE_CONFIG, "timestamp");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "<TABLE_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, "<TABLE_COLUMN_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-oracle-jdbc-");
JdbcSourceConnector jdbcSourceConnector = new JdbcSourceConnector ();
jdbcSourceConnector.start(jdbcConnectorConfig);
}