У меня есть простая таблица базы данных в MySQL, которая имеет (id varchar(255), val varchar(255), ..., ...)
.
Я настроил Kafka Connect для потоковой передачи таблицы в тему (CONNECT_TOPIC) с двадцатью разделами.У меня есть другая тема (STREAM_TOPIC), которая заполнена производителем kafka с двадцатью разделами.
Проблема заключается в том, что ключи из карты соединителей для разных разделов в CONNECTOR_TOPIC отличаются от ключей в STREAM_TOPIC.Это означает, что я не могу присоединиться к двум темам в потоке.Я полагаю, что это потому, что идентификатор извлекается неправильно.
Вот пример этого вывода:
Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e
Я попытался следующие коннекторы конфигурации для преобразования идентификатора:
"name": "CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
"table.whitelist": "CONNECTOR",
"mode": "timestamp",
"timestamp.column.name": "update_ts",
"validate.non.null": "false",
"transforms":"createKey,extractId, castString",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field":"id",
"transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castString.spec": "string",
"topic.prefix": "enrichment-"
}
}
Это идеально извлекает идентификатор, но отображается на неправильный раздел.Я также попробовал extractId вместо extractString, но произошло то же самое.Я не могу найти четкую документацию о том, как именно включить эти преобразования.
В двух словах проблема:
Мне нужно извлечь поле id из строки, сделать его ключом записи,и убедитесь, что он не действует иначе, чем использование производителя kafka для выполнения
KafkaProducer.produce("string key", event)
Если я заполняю обе темы с производителем, они оказываются в правильных разделах, но что-то о подключениисопоставляется с различными разделами, даже если это один и тот же ключ