Flink Rolling File Исключение - PullRequest
       21

Flink Rolling File Исключение

0 голосов
/ 03 сентября 2018

Когда я хочу записать сжатые строковые данные в hdfs, я обнаружил, что flink предоставляет только StringWritter, поэтому я использовал собственный писатель, как показано ниже:

public class StringCompressWriter<T> extends StreamWriterBase<T> { 

    private static final long serialVersionUID = 1L; 

    private String charsetName; 

    private transient Charset charset; 

    private transient CompressionOutputStream outStream; 


    public StringCompressWriter() { 
        this("UTF-8"); 
    } 

    public StringCompressWriter(String charsetName) { 
        this.charsetName = charsetName; 
    } 

    protected StringCompressWriter(StringCompressWriter<T> other) { 
        super(other); 
        this.charsetName = other.charsetName; 
    } 


    /** 
     * open & write 
     * @return 
     */ 
    @Override 
    public void open(FileSystem fs, Path path) throws IOException { 
        super.open(fs, path); 

        this.charset = Charset.forName(charsetName); 

        Configuration conf = fs.getConf(); 

        CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
        CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); 

        FSDataOutputStream dataOutputStream = getStream(); 
        Compressor compressor = CodecPool.getCompressor(codec, fs.getConf()); 
        outStream = codec.createOutputStream(dataOutputStream, compressor); 
    } 

    @Override 
    public void write(T element) throws IOException { 
        getStream(); // Throws if the stream is not open 
        outStream.write(element.toString().getBytes(charset)); 
        outStream.write('\n'); 
    } 

    @Override 
    public void close() throws IOException { 
        if (outStream != null) { 
            outStream.close(); 
//            outStream = null; 
        } 
        super.close(); 
    } 

    @Override 
    public Writer<T> duplicate() { 
        return new StringCompressWriter<>(this); 
    } 

    @Override 
    public int hashCode() { 
        return Objects.hash(super.hashCode(), charsetName); 
    } 

    @Override 
    public boolean equals(Object other) { 
        if (this == other) { 
            return true; 
        } 
        if (other == null) { 
            return false; 
        } 
        if (getClass() != other.getClass()) { 
            return false; 
        } 
        StringCompressWriter<T> writer = (StringCompressWriter<T>) other; 
        // field comparison 
        return Objects.equals(charsetName, writer.charsetName) 
                && super.equals(other); 
    } 
} 

Но когда я запускаю свое приложение на пряже, менеджер задач всегда выдает следующую ошибку:

2018-09-03 15:25:54,187 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED. 
java.nio.channels.ClosedChannelException 
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) 
        at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) 
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) 
        at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) 
        at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) 
        at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) 
        at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) 
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) 
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) 
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) 
        at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) 
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) 
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) 
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) 
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) 
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) 
        at java.lang.Thread.run(Thread.java:748) 
2018-09-03 15:25:54,191 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING. 

Более того, когда я посмотрел на данные, я обнаружил, что поток данных, похоже, не был закрыт должным образом.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l 

текст: неожиданный конец входного потока ZLIB 3268

Может кто-нибудь сказать мне, что случилось?

...