Когда я хочу записать сжатые строковые данные в hdfs, я обнаружил, что flink предоставляет только StringWritter, поэтому я использовал собственный писатель, как показано ниже:
public class StringCompressWriter<T> extends StreamWriterBase<T> {
private static final long serialVersionUID = 1L;
private String charsetName;
private transient Charset charset;
private transient CompressionOutputStream outStream;
public StringCompressWriter() {
this("UTF-8");
}
public StringCompressWriter(String charsetName) {
this.charsetName = charsetName;
}
protected StringCompressWriter(StringCompressWriter<T> other) {
super(other);
this.charsetName = other.charsetName;
}
/**
* open & write
* @return
*/
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.charset = Charset.forName(charsetName);
Configuration conf = fs.getConf();
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");
FSDataOutputStream dataOutputStream = getStream();
Compressor compressor = CodecPool.getCompressor(codec, fs.getConf());
outStream = codec.createOutputStream(dataOutputStream, compressor);
}
@Override
public void write(T element) throws IOException {
getStream(); // Throws if the stream is not open
outStream.write(element.toString().getBytes(charset));
outStream.write('\n');
}
@Override
public void close() throws IOException {
if (outStream != null) {
outStream.close();
// outStream = null;
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new StringCompressWriter<>(this);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), charsetName);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
// field comparison
return Objects.equals(charsetName, writer.charsetName)
&& super.equals(other);
}
}
Но когда я запускаю свое приложение на пряже, менеджер задач всегда выдает следующую ошибку:
2018-09-03 15:25:54,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED.
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING.
Более того, когда я посмотрел на данные, я обнаружил, что поток данных, похоже, не был закрыт должным образом.
hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l
текст: неожиданный конец входного потока ZLIB
3268
Может кто-нибудь сказать мне, что случилось?