Соединитель преобразует неправильно карты ключ раздела - PullRequest
0 голосов
/ 19 января 2019

У меня есть простая таблица базы данных в MySQL, которая имеет (id varchar(255), val varchar(255), ..., ...).

Я настроил Kafka Connect для потоковой передачи таблицы в тему (CONNECT_TOPIC) с двадцатью разделами.У меня есть другая тема (STREAM_TOPIC), которая заполнена производителем kafka с двадцатью разделами.

Проблема заключается в том, что ключи из карты соединителей для разных разделов в CONNECTOR_TOPIC отличаются от ключей в STREAM_TOPIC.Это означает, что я не могу присоединиться к двум темам в потоке.Я полагаю, что это потому, что идентификатор извлекается неправильно.

Вот пример этого вывода:

Stream Task ID 0_13 Partition number 13 Consumed CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_13 Partition number 13 Consumed JOINED CONNECT_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf
Stream Task ID 0_17 Partition number 17 Consumed JOINED STREAM_EVENT 68f52084-cfc9-4997-a28e-57cfd4f7bbbf

Stream Task ID 0_7 Partition number 7 Consumed STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_7 Partition number 7 Consumed JOINED STREAM_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a
Stream Task ID 0_17 Partition number 17 Consumed JOINED CONNECT_EVENT 32aaa88d-b175-4a54-8338-d542ed051e6a

Stream Task ID 0_11 Partition number 11 Consumed CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED CONNECT_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Consumed JOINED STREAM_EVENT 90265a93-adac-4e93-856c-d1498eeeb22e
Stream Task ID 0_11 Partition number 11 Merged 90265a93-adac-4e93-856c-d1498eeeb22e

Я попытался следующие коннекторы конфигурации для преобразования идентификатора:

        "name": "CONNECTOR",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

                "key.converter": "org.apache.kafka.connect.json.JsonConverter",

                "key.converter.schemas.enable":"false",

                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable":"false",


                "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",

                "table.whitelist": "CONNECTOR",

                "mode": "timestamp",

                "timestamp.column.name": "update_ts",

                "validate.non.null": "false",

                "transforms":"createKey,extractId, castString",
                "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.createKey.fields":"id",
                "transforms.extractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.extractId.field":"id",
                "transforms.castString.type": "org.apache.kafka.connect.transforms.Cast$Key",
                "transforms.castString.spec": "string",

                "topic.prefix": "enrichment-"
        }
}

Это идеально извлекает идентификатор, но отображается на неправильный раздел.Я также попробовал extractId вместо extractString, но произошло то же самое.Я не могу найти четкую документацию о том, как именно включить эти преобразования.

В двух словах проблема:

Мне нужно извлечь поле id из строки, сделать его ключом записи,и убедитесь, что он не действует иначе, чем использование производителя kafka для выполнения

KafkaProducer.produce("string key", event)

Если я заполняю обе темы с производителем, они оказываются в правильных разделах, но что-то о подключениисопоставляется с различными разделами, даже если это один и тот же ключ

...