Ошибка при использовании пустого пользовательского источника в Flink - PullRequest
0 голосов
/ 26 октября 2018

Что я делаю, это:

1. Скачайте flink-1.6.1-bin-scala_2.11.tgz и распакуйте

2.открыть затмение, создать новый проект java с jdk 1.8, добавить jar в направлении lib в flink-1.6.1-bin-scala_2.11 в classpath к проекту

3.создать пустой пользовательский источник:

package flinkSrc;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;


public class ShopSourceFromMongo extends RichSourceFunction<String>{
    public void cancel() {}

    public void run(SourceContext<String> sourceContext) throws Exception {}

    @Override
    public void open(Configuration parameters) throws Exception {

    }

    @Override
    public void close() throws Exception {}



}

4. напишите основную для запуска:

package flinkSrc;

import java.util.Iterator;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Main2 {
    public static void main(String[] args) throws Exception {
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

           DataStreamSource<String> shops = env.addSource(new ShopSourceFromMongo());

           Iterator<String> i = DataStreamUtils.collect(shops);
           while(i.hasNext()){
               System.out.println(i.next());
           }

            env.execute();
    }
}

Когда я запускаю метод main, я получаю сообщение об ошибке:

log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraphGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.io.IOException: Cannot connect to the client to send back the stream
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
    at flinkSrc.Main2.main(Main2.java:23)
Caused by: java.io.IOException: Cannot connect to the client to send back the stream
    at org.apache.flink.streaming.experimental.CollectSink.open(CollectSink.java:85)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.connect0(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:244)
    at org.apache.flink.streaming.experimental.CollectSink.open(CollectSink.java:80)
    ... 7 more

Что не так? На самом деле я пишу сложный код, но он получил ту же ошибку, поэтому я удаляю весь код.

Я добавил банки:

flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\flink-dist_2.11-1.6.1.jar
flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\flink-python_2.11-1.6.1.jar
flink-1.6.1-bin-scala_2.11\flink-1.6.1\lib\log4j-1.2.17.jar
flink-1.6.1\lib\slf4j-log4j12-1.7.7.jar
...