Я хочу записать данные из Kafka в базу данных MySQL. Я реализовал следующий код.
kafkaConsumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> record = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record1 : record) {
if (record1.value().length() > 0) {
System.out.println(record1.value());
String value = record1.value();
String[] array = value.split(" ");
String sql = String.format("insert into data(timestamp, LogLevel,CityName,Detail) values ('%s', '%s','%s','%s')", array[0], array[1], array[2], array[3]);
mysqlConnector.statement.executeUpdate(sql);
}
}
}