Использование запроса к базе данных кассандры в качестве источника для программы Flink - PullRequest
0 голосов
/ 27 декабря 2018

У меня есть база данных Cassandra, которая должна получать свои данные в моей программе Flink из сокета, например steam, для Streamprocessing.Итак, я написал простую клиентскую программу, которая считывала данные из Cassandra и отправляла данные в сокет, а также я написал программу Flink на серверной базе. На самом деле моя клиентская программа проста и не использует никаких инструкций Flink, она простоотправить строку Cassandra в формате строки в сокет, и сервер должен получить строку.Сначала я запускаю программу Flink для прослушивания клиента, а затем запускаю клиентскую программу.Клиент получил этот поток с сервера (так как сервер отправляет данные потока данных, а клиент не может получить его правильно):

Привет клиент org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

После этого обе программы продолжают работать без отправки и получения каких-либо данных, и ошибки не возникает.

Программа Flink имеет следующий вид: открытый класс WordCount_in_cassandra {

 private static int myport=9999;
 private static String hostname="localhost";
 //static ServerSocket variable
 private static ServerSocket server;
 private static int count_row=0;

 public static void main(String[] args) throws Exception {
 // Checking input parameters
 final ParameterTool params = ParameterTool.fromArgs(args);
 // set up the execution environment
 final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();

 //create the socket server object
    server = new ServerSocket(myport);
 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    while (true){
        System.out.println("Waiting for client request");
        //creating socket and waiting for client connection
        Socket socket = server.accept();
        DataStream<String> stream = env.socketTextStream(hostname, 
        myport);

        stream.print();

        //write object to Socket
        oos.writeObject("Hi Client " + stream.toString());
        oos.close();
        socket.close();

        // parse the data, group it, window it, and aggregate the 
        counts
    DataStream<Tuple2<String, Long>> counts = stream
                .flatMap(new FlatMapFunction<String, Tuple2<String, 
    Long>>() {
                    @Override
            public void flatMap(String value, 
     Collector<Tuple2<String, Long>> out) {
                        // normalize and split the line
           String[] words = value.toLowerCase().split("\\W+");

                        // emit the pairs
             for (String word : words) {

                if (!word.isEmpty()) {
                   out.collect(new Tuple2<String, Long>(word, 1L));
                            }
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use -- 
            output to specify output path.");

            counts.print();
        }

        //terminate the server if client sends exit request
        if (stream.equals("exit")){
            System.out.println("row_count : "+count_row);
            break;
        }

        // execute program
        env.execute("Streaming WordCount");
    }//while true
    System.out.println("Shutting down Socket server!!");
    server.close();
     }//main
   }

Клиентская программа выглядит следующим образом:

public class client_code {
private static Cluster cluster = 
  Cluster.builder().addContactPoint("127.0.0.1")
 .withPort(9042).build();
private static Session session = cluster.connect("mar1");

 public static void main(String[] args) throws UnknownHostException, 
   IOException, ClassNotFoundException, InterruptedException {
    String serverIP = "localhost";
    int port=9999;
    Socket socket = null;
    ObjectOutputStream oos = null;
    ObjectInputStream ois = null;

    ResultSet result = session.execute("select * from tlbtest15");
    for (Row row : result) {
        //establish socket connection to server
        socket = new Socket(serverIP, port);
        //write to socket using ObjectOutputStream
        oos = new ObjectOutputStream(socket.getOutputStream());
        System.out.println("Sending request to Socket Server");

        if (row==result) oos.writeObject("exit");
        else oos.writeObject(""+row+"");
        //read the server response message
        ois = new ObjectInputStream(socket.getInputStream());
        String message = (String) ois.readObject();
        System.out.println("Message: " + message);
        //close resources
        ois.close();
        oos.close();
        Thread.sleep(100);
    }

    cluster.close();
 }
}

Подскажите, пожалуйста, как я могу решить мою проблему?

Буду признателен за любую помощь.

1 Ответ

0 голосов
/ 27 декабря 2018

Есть несколько проблем, связанных с тем, как вы пытались создать приложение Flink.Несколько комментариев:

  • Flink DataStream API используется для описания графа потока данных, который отправляется в кластер для выполнения при вызове env.execute ().Не имеет смысла заключать это в цикл while(true).
  • socketTextStream устанавливает клиентское соединение.Ваш сервер, кажется, не делает ничего полезного.
  • stream.equals("exit") - поток - это DataStream, а не String.Если вы хотите сделать что-то особенное, когда элемент потока имеет определенное значение, это нужно сделать по-другому, используя одну из потоковых операций, которая выполняет обработку по событию за раз.Что касается выключения задания Flink, потоковые задания обычно рассчитаны либо на неопределенное время, либо на выполнение до тех пор, пока конечный источник ввода не достигнет своего конца, и в этот момент они отключаются самостоятельно.

Вы можете значительно упростить вещи.Я бы начал сначала и заменил вашего клиента командной строкой, подобной этой:

cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999

nc (netcat) в этом случае будет действовать как сервер, позволяя Flink быть клиентом.Это упростит задачу, поскольку именно так предполагается использовать env.socketTextTream.

Тогда вы сможете обработать результаты с помощью обычного приложения Flink.SocketTextStream создаст поток, содержащий результаты запроса в виде строк текста, по одной для каждой строки.

...