ReadLg.java
package com.example.dev1.controller;
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ReadLg {
public static void main(String[] args) {
String conn = "jdbc:mysql://10.247.36.174:3306/d_accesslog?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC";
String username = "*****";
String pwd = "*****";
String sql = "INSERT INTO t_weblogic_test (RawData) values (?)";
Properties props = new Properties();
props.put("bootstrap.servers", "10.247.36.174:3306");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
String[] levParam = { "<DEBUG>", "<WARN", "<INFO", "<ERROR>", "INFO", "WARN", "DEBUG", "ERROR" };
List<String> listparam = Arrays.asList(levParam);
try (Connection con = DriverManager.getConnection(conn, username, pwd);
PreparedStatement ps = con.prepareStatement(sql);
BufferedReader br = new BufferedReader(new FileReader("All.log"));) {
String line = null;
while ((line = br.readLine()) != null) {
String firstWord = line.split(" ", 2)[0];
if (listparam.contains(firstWord)) {
processMessages(line, br, listparam, ps);
break;
} else {
System.out.println("Failed!!");
}
}
} catch (Exception ex) {
System.err.println("Error in \n" + ex);
} finally {
producer = new KafkaProducer<>(props);
String msg = "Done";
producer.send(new ProducerRecord<String, String>("HelloKafkaTopic", msg));
System.out.println("Sent: " + msg);
}
producer.close();
}
public static void processMessages(String line, BufferedReader br, List<String> listparam, PreparedStatement ps)
throws Exception {
// PrintWriter out = new PrintWriter(new BufferedWriter(
// new OutputStreamWriter(new FileOutputStream(java.io.FileDescriptor.out), "UTF-8"), 512));
StringBuilder message = new StringBuilder();
message.append(line);
// String res = message.toString();
while ((line = br.readLine()) != null) {
String firstWord = line.split(" ", 2)[0];
if (listparam.contains(firstWord)) {
ps.setString(1, message.toString());
ps.executeUpdate();
// res = line;
message.setLength(0);
message.append(line);
} else {
message.append("\n" + line);
// res = message.toString();
}
}
if (message.length() > 0) {
ps.setString(1, message.toString());
ps.executeUpdate();
}
/*
* out.flush(); out.close();
*/
}
}
RetrieveData.java
package com.example.dev2.controller;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.sql.*;
public class RetrieveData {
@SuppressWarnings({ "resource", "deprecation" })
public static void main(String[] args) {
String conn = "jdbc:mysql://ipaddress/d_accesslog?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC";
String username = "****";
String pwd = "****";
String sql = "SELECT * from t_weblogic_test";
Properties props = new Properties();
props.put("bootstrap.servers", "ipaddress");
props.put("group.id", "group-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
if (record.value().equals("Done")) {
try (Connection con = DriverManager.getConnection(conn, username, pwd);
PreparedStatement ps = con.prepareStatement(sql);) {
ResultSet rs = con.createStatement().executeQuery(sql);
while (rs.next()) {
String rawData = rs.getString("RawData");
System.out.println(rawData);
}
con.createStatement().close();
} catch (Exception ex) {
System.err.println("Error in \n" + ex);
}
}
}
}
}
}
Я новичок в Kafka. может кто-нибудь сказать мне, в какой части я ошибся? Я не знаю, как использовать Кафку в Java. в ReadLg.java я хочу прочитать из файла журнала и вставить его в БД, а затем, когда он закончил, я хочу отправить сообщение в RetrieveData.java, чтобы он мог начать. извлеченные данные будут запущены, но бездействующие в ожидании сообщения из ReadLg.java. Это плохой подход? или старый подход? какие-либо предложения или помощь по исправлению этого? Я получаю сообщение об ошибке не могу подключиться к IP-адресу
Ниже приведено сообщение об ошибке:
14:23:00.482 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected.
14:23:00.482 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.532 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.583 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.633 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.683 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.733 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.834 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.885 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.935 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.985 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.036 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.086 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.137 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.187 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.238 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.289 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.339 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.389 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.490 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 10.247.36.174:3306 (id: -1 rack: null) for sending metadata request
14:23:01.490 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 10.247.36.174:3306 (id: -1 rack: null)
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
14:23:02.520 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with /10.247.36.174 disconnected
java.io.EOFException: null