Как я сказал в комментарии, вы должны хранить ваше соединение в некоторой статической переменной, потому что ваши Sources- и Sinks не будут использовать одно и то же соединение в противном случае.Вы также должны убедиться, что ваш Source и Sink работают на одной и той же JVM, используя один и тот же Classloader, иначе у вас все равно будет более одного соединения.
Я создал этот класс-обертку, который содержит необработанные Socket-Connection и Reader/ Экземпляр Writer для этого соединения.Поскольку ваш источник всегда останавливается до того, как ваш приемник (так работает Flink), этот класс также восстанавливает соединение, если он был закрыт ранее.
package example;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;
public class SocketConnection implements Closeable {
private final String host;
private final int port;
private final Object lock;
private volatile Socket socket;
private volatile BufferedReader reader;
private volatile PrintStream writer;
public SocketConnection(String host, int port) {
this.host = host;
this.port = port;
this.lock = new Object();
this.socket = null;
this.reader = null;
this.writer = null;
}
private void connect() throws IOException {
this.socket = new Socket(this.host, this.port);
this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
this.writer = new PrintStream(this.socket.getOutputStream());
}
private void ensureConnected() throws IOException {
// only acquire lock if null
if (this.socket == null) {
synchronized (this.lock) {
// recheck if socket is still null
if (this.socket == null) {
connect();
}
}
}
}
public BufferedReader getReader() throws IOException {
ensureConnected();
return this.reader;
}
public PrintStream getWriter() throws IOException {
ensureConnected();
return this.writer;
}
@Override
public void close() throws IOException {
if (this.socket != null) {
synchronized (this.lock) {
if (this.socket != null) {
this.reader.close();
this.reader = null;
this.writer.close();
this.writer = null;
this.socket.close();
this.socket = null;
}
}
}
}
}
Ваш основной класс (или любой другой класс) содержит один экземпляр этогокласс, к которому затем обращается ваш источник и ваш приемник:
package example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main {
public static final SocketConnection CONNECTION = new SocketConnection("your-host", 12345);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SocketTextStreamSource())
.addSink(new SocketTextStreamSink());
env.execute("Flink Streaming Scala API Skeleton");
}
}
Ваша SourceFunction может выглядеть примерно так:
package example;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class SocketTextStreamSource implements SourceFunction<String> {
private volatile boolean running;
public SocketTextStreamSource() {
this.running = true;
}
@Override
public void run(SourceContext<String> context) throws Exception {
try (SocketConnection conn = Main.CONNECTION) {
String line;
while (this.running && (line = conn.getReader().readLine()) != null) {
context.collect(line);
}
}
}
@Override
public void cancel() {
this.running = false;
}
}
И ваша SinkFunction:
package example;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class SocketTextStreamSink extends RichSinkFunction<String> {
private transient SocketConnection connection;
@Override
public void open(Configuration parameters) throws Exception {
this.connection = Main.CONNECTION;
}
@Override
public void invoke(String value, Context context) throws Exception {
this.connection.getWriter().println(value);
this.connection.getWriter().flush();
}
@Override
public void close() throws Exception {
this.connection.close();
}
}
Обратите внимание, что я всегда использую getReader()
и getWriter()
, потому что основной сокет может быть закрыт в это время.