Отправить Кафка строковое сообщение Java для запуска программы на других Java - PullRequest
0 голосов
/ 09 октября 2019

ReadLg.java

package com.example.dev1.controller;

import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ReadLg {
    public static void main(String[] args) {
        String conn = "jdbc:mysql://10.247.36.174:3306/d_accesslog?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC";
        String username = "*****";
        String pwd = "*****";
        String sql = "INSERT INTO t_weblogic_test (RawData) values (?)";

        Properties props = new Properties();
        props.put("bootstrap.servers", "10.247.36.174:3306");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;

        String[] levParam = { "<DEBUG>", "<WARN", "<INFO", "<ERROR>", "INFO", "WARN", "DEBUG", "ERROR" };
        List<String> listparam = Arrays.asList(levParam);

        try (Connection con = DriverManager.getConnection(conn, username, pwd);
                PreparedStatement ps = con.prepareStatement(sql);
                BufferedReader br = new BufferedReader(new FileReader("All.log"));) {

            String line = null;
            while ((line = br.readLine()) != null) {
                String firstWord = line.split(" ", 2)[0];
                if (listparam.contains(firstWord)) {
                    processMessages(line, br, listparam, ps);
                    break;
                } else {
                    System.out.println("Failed!!");
                }
            }
        } catch (Exception ex) {
            System.err.println("Error in \n" + ex);
        } finally {
            producer = new KafkaProducer<>(props);
            String msg = "Done";
            producer.send(new ProducerRecord<String, String>("HelloKafkaTopic", msg));
            System.out.println("Sent: " + msg);

        }
        producer.close();

    }

    public static void processMessages(String line, BufferedReader br, List<String> listparam, PreparedStatement ps)
            throws Exception {
//      PrintWriter out = new PrintWriter(new BufferedWriter(
//              new OutputStreamWriter(new FileOutputStream(java.io.FileDescriptor.out), "UTF-8"), 512));

        StringBuilder message = new StringBuilder();
        message.append(line);
        // String res = message.toString();
        while ((line = br.readLine()) != null) {
            String firstWord = line.split(" ", 2)[0];
            if (listparam.contains(firstWord)) {
                ps.setString(1, message.toString());
                ps.executeUpdate();
                // res = line;
                message.setLength(0);
                message.append(line);
            } else {
                message.append("\n" + line);
                // res = message.toString();
            }
        }

        if (message.length() > 0) {
            ps.setString(1, message.toString());
            ps.executeUpdate();
        }
        /*
         * out.flush(); out.close();
         */
    }
}

RetrieveData.java

package com.example.dev2.controller;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.sql.*;

public class RetrieveData {

    @SuppressWarnings({ "resource", "deprecation" })
    public static void main(String[] args) {
        String conn = "jdbc:mysql://ipaddress/d_accesslog?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC";
        String username = "****";
        String pwd = "****";
        String sql = "SELECT * from t_weblogic_test";
        Properties props = new Properties();

        props.put("bootstrap.servers", "ipaddress");
        props.put("group.id", "group-1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("HelloKafkaTopic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                if (record.value().equals("Done")) {
                    try (Connection con = DriverManager.getConnection(conn, username, pwd);
                            PreparedStatement ps = con.prepareStatement(sql);) {
                        ResultSet rs = con.createStatement().executeQuery(sql);
                        while (rs.next()) {
                            String rawData = rs.getString("RawData");
                            System.out.println(rawData);
                        }
                        con.createStatement().close();
                    } catch (Exception ex) {
                        System.err.println("Error in \n" + ex);
                    }
                }
            }
        }
    }
}

Я новичок в Kafka. может кто-нибудь сказать мне, в какой части я ошибся? Я не знаю, как использовать Кафку в Java. в ReadLg.java я хочу прочитать из файла журнала и вставить его в БД, а затем, когда он закончил, я хочу отправить сообщение в RetrieveData.java, чтобы он мог начать. извлеченные данные будут запущены, но бездействующие в ожидании сообщения из ReadLg.java. Это плохой подход? или старый подход? какие-либо предложения или помощь по исправлению этого? Я получаю сообщение об ошибке не могу подключиться к IP-адресу
Ниже приведено сообщение об ошибке:

14:23:00.482 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected.
14:23:00.482 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.532 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.583 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.633 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.683 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.733 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.834 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.885 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.935 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:00.985 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.036 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.086 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.137 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.187 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.238 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.289 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.339 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.389 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
14:23:01.490 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node 10.247.36.174:3306 (id: -1 rack: null) for sending metadata request
14:23:01.490 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node 10.247.36.174:3306 (id: -1 rack: null)
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
14:23:02.012 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1.
14:23:02.520 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with /10.247.36.174 disconnected
java.io.EOFException: null

1 Ответ

1 голос
/ 10 октября 2019

Подводя итоги: чтобы сначала использовать Kafka Consumer / Producer, вы должны запустить Zookeeper и Kafka broker.

Для тестирования или разработки вы можете запустить его самостоятельно, используя:

  1. документация: https://kafka.apache.org/documentation/#quickstart_startserver
  2. Изображение Docker: https://hub.docker.com/r/wurstmeister/kafka

Если ваш Kafka готов, вы можете начать его использовать. Вы должны помнить, чтобы установить правильное значение для bootstrap.server (для локального использования обычно это localhost:9092)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...