когда я использую тот же ключ, Kafka JDBC connect не публикует сообщения в один раздел - PullRequest
0 голосов
/ 22 января 2019

Сообщения с одинаковым ключом должны отправляться в один и тот же раздел темы, но исходный соединитель Kafka JDBC публикует сообщение в другом разделе.

Я создал ТЕМУ (тему для студентов) с 5 разделами.

Я создал таблицу учеников, используя скрипт ниже:

create TABLE student (
  std_id INT AUTO_INCREMENT PRIMARY KEY,
  std_name VARCHAR(50),
  class_name VARCHAR(50),
  father_name VARCHAR(50),
  mother_name VARCHAR(50), 
  school VARCHAR(50)
);

Мой файл свойств JDBC для быстрого запуска приведен ниже

query= select * from student
task.max=1
mode=incrementing
incrementing.column.name=std_id
topic.prefix=student-topic-in
numeric.mapping=best_fit
timestamp.delay.interval.ms =10
transforms=CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic
transforms.CreateKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.CreateKey.fields=class_name
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=class_name

когда я вставляю те же данные об ученике класса в таблицу БД, все сообщения публикуются в одном разделе.

student-topic-in 3 "15" @ 35: {"std_id":145,"std_name":"pranavi311","class_name":"15","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 36: {"std_id":146,"std_name":"pranavi321","class_name":"15","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 37: {"std_id":147,"std_name":"pranavi331","class_name":"15","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 38: {"std_id":148,"std_name":"pranavi341","class_name":"15","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 39: {"std_id":149,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 40: {"std_id":150,"std_name":"pranavi361","class_name":"15","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}

% Достигнут конец темы студент-тема-в [3] по смещению 41

Но, если я вставлю информацию о разных учениках, она все равно будет опубликована в одном разделе.

student-topic-in 3 "11" @ 41: {"std_id":151,"std_name":"pranavi311","class_name":"11","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "12" @ 42: {"std_id":152,"std_name":"pranavi321","class_name":"12","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "13" @ 43: {"std_id":153,"std_name":"pranavi331","class_name":"13","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "14" @ 44: {"std_id":154,"std_name":"pranavi341","class_name":"14","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 45: {"std_id":155,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 0 "16" @ 31: {"std_id":156,"std_name":"pranavi361","class_name":"16","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}

% Достигнут конец темы студент-тема-в [3] по смещению 46

Я использую команду ниже, чтобы напечатать детали.

kafkacat -b localhost:9092 -C -t student-topic-in -f '%t %p %k @ %o: %s\n' 

Я ожидаю, что сообщения учеников каждого класса должны публиковаться в одном конкретном разделе (в соединителе JDBC я назначаю имя класса в качестве ключа), но это не работает.

Что именно мне не хватает? как опубликовать каждый класс учеников в определенный раздел?

Ответы [ 2 ]

0 голосов
/ 22 января 2019

В вашем случае все работает правильно.

Если вы проверите исходный код Kafka Connect, вы увидите в методе WorkerSourceTask::sendRecords, что к каждой записи перед отправкой Producer применяются преобразования, а затем сообщение преобразуется.в массив байтов на Converter

private boolean sendRecords() {
    ...
    final SourceRecord record = transformationChain.apply(preTransformRecord);
    final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record); 
    ...
}

В вашем случае преобразования: CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic, а преобразователь - org.apache.kafka.connect.json.JsonConverter

Преобразователь сопоставляет ваш ключ со схемой в массив байтов,то есть для отправки в Kafka.

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
    JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
    try {
        return serializer.serialize(topic, jsonValue);
    } catch (SerializationException e) {
        throw new DataException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e);
    }
}

Вы отключили схему, поэтому для ваших ключей следует следующий вызов с результатами:

  • 11 serializer.serialize(topic,new TextNode("11")) = [34,49,49,34]
  • 12 serializer.serialize(topic,new TextNode("12")) = [34,49,50,34]
  • 13 serializer.serialize(topic,new TextNode("13")) = [34,49,51,34]
  • 14 serializer.serialize(topic,new TextNode("14")) = [34,49,52,34]
  • 15 serializer.serialize(topic,new TextNode("15")) = [34,49,53,34]
  • 16 serializer.serialize(topic,new TextNode("16")) = [34,49,54,34]

Каждое сообщение отправляется Producer в некоторый раздел.На какой раздел будет отправлено сообщение, зависит от Partitioner (org.apache.kafka.clients.producer.Partitioner).Kafka Connect использует значение по умолчанию - org.apache.kafka.clients.producer.internals.DefaultPartitioner

Под капотом DefaultPartitioner использует следующую функцию для расчета раздела: org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Если вы применяете к своим параметрам (5 разделов и массивовбайт ваших ключей), вы получите следующее:

  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,49,34})) % 5 = 3
  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,50,34})) % 5 = 3
  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,51,34})) % 5 = 3
  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,52,34})) % 5 = 3
  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,53,34})) % 5 = 3
  • Utils.toPositive(Utils.murmur2(new byte[]{34,49,54,34})) % 5 = 0

Надеюсь, что более-менее объясним, что ипочему

0 голосов
/ 22 января 2019

Я решил эту проблему с помощью String converter key.converter=org.apache.kafka.connect.storage.StringConverter

...