Я пытаюсь использовать записи из таблицы MySQL, которая содержит 3 столбца (Axis, Price, lastname)
с их типами данных (int, decimal(14,4), varchar(50))
соответственно.
Я вставил одну запись со следующими данными (1, 5.0000, John)
.
Следующий код Java (который использует записи AVRO из темы, созданной MySQL, Connector на платформе Confluent) читает десятичный столбец: Price, как тип java.nio.HeapByteBuffer, поэтому я не могу достичь значения столбца когда я получу это.
Есть ли способ извлечь или преобразовать полученные данные в десятичный или двойной тип данных Java?
Вот файл свойств MySQL Connector: -
{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "Axis",
"tasks.max": "1",
"table.whitelist": "ticket",
"mode": "incrementing",
"topic.prefix": "mysql-",
"name": "mysql-source",
"validate.non.null": "false",
"connection.url": "jdbc:mysql://localhost:3306/ticket?
user=user&password=password"
}
}
Вот код: -
public static void main(String[] args) throws InterruptedException,
IOException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "sql-ticket";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value().get("Price"));
}
}
} finally {
consumer.close();
}
}