Запрос коннектора JDBC Kafka Connect + дроссели в инкрементном режиме с большими наборами данных при первом опросе - PullRequest
0 голосов
/ 29 марта 2019

Я использую соединитель JDBC для перемещения данных из MySQL в Kafka. Интересующие меня данные поступают из избранных, объединяющих 3 таблицы, поэтому я настроил свой соединитель на mode:incrementing и query:

{
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}

При проверке состояния разъема я получаю, что работает:

curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}

Но через час я все еще не вижу созданную тему. Я проверил в MySQL, какие запросы выполняются с show full processlist;, и я вижу два таких запроса:

select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC

Таким образом, запрос в основном совпадает с запросом, который я предоставил в query в конфигурации соединителя плюс WHERE s.id > -1 ORDER BY s.id ASC, так как запрос в этом объединении создает набор из 21 миллионной строки, MySQL отправляет данные в течение длительного времени , Когда я проверяю снова с помощью show full processlist;, я вижу теперь 4 таких запроса, затем 8, затем 16 и так далее.

Вопросы:

  1. Почему Kafka connect пытается получить ALL строк сразу при добавлении: s.id > -1 ORDER BY s.id ASC.
  2. Можно ли настроить соединитель так, чтобы он этого не делал, а вместо этого извлекал меньшее количество?
  3. "batch.max.rows": "100" контролирует только размер пакета после первоначального опроса ??

Обновление:

Существует открытая тема для этой проблемы . Я думаю, что этот вопрос можно закрыть.

1 Ответ

0 голосов
/ 29 марта 2019

JDBC Source Connector с incrementing mode и передачей query, выполните этот запрос со следующим предложением where: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC.(если вы используете инкрементный режим и запрос, вы не можете пропустить предложение where).

При первом опросе lastIncrementedValue равен -1, поэтому он пытается запросить все записи.После извлечения каждой записи lastIncrementedValue увеличивается, поэтому при следующем запросе будут опрошены только новые данные.batch.max.rows указывает, сколько записей SourceTask::poll(...) вернется в среду Kafka Connect.Это максимальный размер пакета, который будет сразу отправлен в Kafka.

Я думаю, что когда вы выбираете данные из одной таблицы, это работает быстрее, потому что запрос выполняется быстрее (менее сложно).Если вы выполните эти запросы, используя другие инструменты SQL, они будут выполнять аналогичные действия.

...