Как программно записать текст в сокет Flink? - PullRequest
0 голосов
/ 04 мая 2020

Моя цель - отправить строку «УСПЕХ» в сокет и заставить Apache DataStream Флинка подобрать эту строку и обновить локальный текстовый файл словом «УСПЕХ». Я уже настроил « Потребитель». DataStream", который отслеживает текст, отправляемый с терминала на порт 9999 на моем локальном хосте. Пожалуйста, обратитесь к приведенному ниже коду для рабочего кода Consumer DataStream:

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return value.equals("SUCCESS");
            }
        });

        filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);

        env.execute("Reading Flink Stream");
    }
}

Это работает для меня; Когда я запускаю «n c -l 9999» на своем терминале, « текстовый файл потребителя », найденный в FILE-PATH, обновляется только в том случае, если я набираю «УСПЕХ» в терминале и нажимаю клавишу Enter , Пока отлично!

Теперь я бы хотел не использовать Терминал, но по-прежнему записывать текст в этот сокет, чтобы мой «Consumer DataStream» мог его забрать. Этот текст может быть любым текстом, так как мой «Consumer DataStream» отфильтрует текст для того, что он хочет, или «УСПЕХ».

Единственный способ узнать, как записать в сокет, это использовать терминал ( как обсуждалось ранее) или используя Producer DataStream , который постоянно читает из " Producer Text File " и вызывает DataStream.writeToSocket () для записи текста, который он читает, в сокет. Мой «Consumer Datastream» затем подхватит это и будет вести себя как положено.

Есть ли какая-либо другая опция, которая может быть сделана для записи текста в сокет, чтобы DataStream мог подобрать этот текст? Я попытался использовать библиотеку сокетов, чтобы написать «УСПЕХ» на localhost: 9999, но безрезультатно.

Это изображение может помочь визуализировать то, что я ищу, и то, что я уже решил: https://ibb.co/41GzNWM

Спасибо за ваше время!

1 Ответ

0 голосов
/ 04 мая 2020

Вам необходимо создать эквивалент nc -l aka сервера сокетов .

Пример кода:

public class SocketWriter {

    public static void main(String[] args) {
        int port = 9999;
        boolean stop = false;
        ServerSocket serverSocket = null;


        try {

            serverSocket = new ServerSocket(port);

            while (!stop) {
                Socket echoSocket = serverSocket.accept();
                PrintWriter out =
                        new PrintWriter(echoSocket.getOutputStream(), true);
                BufferedReader in =
                        new BufferedReader(
                                new InputStreamReader(echoSocket.getInputStream()));

                out.println("Hello Amazing");
                // do whatever logic you want.

                in.close();
                out.close();
                echoSocket.close();

            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try {
                serverSocket.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}
...