Невозможно использовать JdbcSourceConnector Кафки для чтения данных из базы данных Oracle в тему kafka - PullRequest
0 голосов
/ 12 июня 2019

Я пытаюсь написать отдельную Java-программу, использующую API-интерфейс kafka-jdbc-connect для потоковой передачи данных из таблицы оракулов в тему kafka.

Используемый API: в настоящее время я пытаюсь использовать Kafka Connectors, точнее, класс JdbcSourceConnector.

Ограничение: используйте Confluent Java API и не делайте этого через CLI или путем выполнения предоставленного сценария оболочки.

Что я сделал: создал экземпляр класса JdbcSourceConnector.java и вызвал метод start (Properties) этого класса, указав объект Properties в качестве параметра. Этот объект свойств имеет свойства подключения к базе данных, свойство белого списка таблиц, префикс темы и т. Д.

После запуска темы я не могу прочитать данные из темы "topic-prefix-tablename". Я не уверен, как передать детали Kafka Broker в JdbcSourceConnector. Вызов метода start () в начальном потоке JdbcSourceConnector, но бездействие. Существует ли простая кодовая страница учебного руководства по API Java, на которую я могу сослаться, потому что все примеры, которые я вижу, используют сценарии командной строки / оболочки?

Любая помощь приветствуется

Код:

    public static void main(String[] args) {

        Map<String, String> jdbcConnectorConfig = new HashMap<String, String>();
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "<DATABASE_URL>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "<DATABASE_USER>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "<DATABASE_PASSWORD>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "300000");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG, "10");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.MODE_CONFIG, "timestamp");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "<TABLE_NAME>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, "<TABLE_COLUMN_NAME>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-oracle-jdbc-");


        JdbcSourceConnector jdbcSourceConnector = new JdbcSourceConnector ();
        jdbcSourceConnector.start(jdbcConnectorConfig);

    }

1 Ответ

0 голосов
/ 18 июня 2019

Если вы пытаетесь сделать это в автономном режиме.

В конфигурации запуска вашего приложения ваш главный класс должен быть "org.apache.kafka.connect.cli.ConnectStandalone", и вам нужно передать два файла свойств в качестве аргументов программы.

Вы также должны расширить класс «your-custom-JdbcSourceConnector» с помощью класса «org.apache.kafka.connect.source.SourceConnector»

Основной класс: org.apache.kafka.connect.cli.ConnectStandalone

Аргументы программы: . \ Path-to-config \ connect-standalone.conf. \ Path-to-config \ connetcor.properties

"connect-standalone.conf" файл будет содержать все сведения о брокере Kafka.

// Example connect-standalone.conf
bootstrap.servers=<comma seperated brokers list here>

group.id=some_loca_group_id

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=connect.offset

offset.flush.interval.ms=100
offset.flush.timeout.ms=180000
buffer.memory=67108864
batch.size=128000
producers.acks=1
Файл

" connector.properties " будет содержать все сведения, необходимые для создания и запуска соединителя

// Example connector.properties
name=some-local-connector-name
connector.class=your-custom-JdbcSourceConnector    
tasks.max=3
topic=output-topic
fetchsize=10000

Подробнее здесь: https://docs.confluent.io/current/connect/devguide.html#connector-example

...