Написание программы Flink
для записи потребляемых данных от Kafka
до HDFS
.
HDFS
работает нормально, я могу поместить данные в целевой каталог, и каталог имеет полный доступ. Внутри /etc/hadoop/conf/hdfs-site.xml
:
<property>
<name>dfs.namenode.servicerpc-address</name>
<value>namenode_ip:8022</value>
</property>
Я могу пропинговать namenode_ip
, а Flink
может потреблять и отображать данные с Kafka
. Мои Flink
коды:
...
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<String> stream = env.addSource(myConsumer);
BucketingSink bk=new BucketingSink("hdfs://namenode_ip:8022/tmp/demodir");
bk.setInactiveBucketCheckInterval(1L);
stream.addSink(bk);
Но когда он пытается записать данные в HDFS
, происходит сбой:
Caused by: java.net.ConnectException: Call From HSH-D-3218/192.168.97.47 to 59.111.60.158:8022 failed on connection exception: java.net.ConnectException: Connection refused: no further information; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1485)
at org.apache.hadoop.ipc.Client.call(Client.java:1427)
at org.apache.hadoop.ipc.Client.call(Client.java:1337)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy25.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:293)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:398)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:335)
at com.sun.proxy.$Proxy26.create(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:246)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1257)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1199)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:472)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:469)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:928)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:806)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:795)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:635)
... 9 more
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:681)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:777)
at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:409)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1542)
at org.apache.hadoop.ipc.Client.call(Client.java:1373)
... 37 more
Почему это произошло и как это исправить?