Что я делаю, это:
1. Скачайте flink-1.6.1-bin-scala_2.11.tgz и распакуйте
2.открыть затмение, создать новый проект java с jdk 1.8, добавить jar в направлении lib в flink-1.6.1-bin-scala_2.11 в classpath к проекту
3.создать пустой пользовательский источник:
package flinkSrc;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class ShopSourceFromMongo extends RichSourceFunction<String>{
public void cancel() {}
public void run(SourceContext<String> sourceContext) throws Exception {}
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public void close() throws Exception {}
}
4. напишите основную для запуска:
package flinkSrc;
import java.util.Iterator;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> shops = env.addSource(new ShopSourceFromMongo());
Iterator<String> i = DataStreamUtils.collect(shops);
while(i.hasNext()){
System.out.println(i.next());
}
env.execute();
}
}
Когда я запускаю метод main, я получаю сообщение об ошибке:
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraphGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.io.IOException: Cannot connect to the client to send back the stream
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at flinkSrc.Main2.main(Main2.java:23)
Caused by: java.io.IOException: Cannot connect to the client to send back the stream
at org.apache.flink.streaming.experimental.CollectSink.open(CollectSink.java:85)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:244)
at org.apache.flink.streaming.experimental.CollectSink.open(CollectSink.java:80)
... 7 more
Что не так? На самом деле я пишу сложный код, но он получил ту же ошибку, поэтому я удаляю весь код.
Я добавил банки:
flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\flink-dist_2.11-1.6.1.jar
flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\flink-python_2.11-1.6.1.jar
flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\log4j-1.2.17.jar
flink-1.6.1\lib\slf4j-log4j12-1.7.7.jar