Как сделать отказоустойчивость для Flink для передачи данных в hdfs как сжатие gzip? - PullRequest
2 голосов
/ 06 июня 2019

Мы хотим записать сжатые данные в HDFS с помощью Flink's BucketingSink или StreamingFileSink.Я написал свой собственный писатель, который работает нормально, если не происходит сбой.Однако, когда он обнаруживает сбой и перезапускается с контрольной точки, он генерирует файл действительной длины (hadoop <2.7) или усекает файл.К несчастью, gzips - это двоичные файлы, у которых в конце файла есть трейлер.Поэтому простое усечение не работает в моем случае.Любые идеи, чтобы включить точно один раз семантику для сжатия hdfs раковина? </p>

Это код моего автора:

public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {

private static final long serialVersionUID = 2L;

/**
 * The {@code CompressFSDataOutputStream} for the current part file.
 */
private transient GZIPOutputStream compressionOutputStream;

public HdfsCompressStringWriter() {}

@Override
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.setSyncOnFlush(true);
    compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}

public void close() throws IOException {
    if (compressionOutputStream != null) {
        compressionOutputStream.close();
        compressionOutputStream = null;
    }
    resetStream();
}

@Override
public void write(JSONObject element) throws IOException {
    if (element == null || !element.containsKey("body")) {
        return;
    }
    String content = element.getString("body") + "\n";
    compressionOutputStream.write(content.getBytes());
    compressionOutputStream.flush();
}

@Override
public Writer<JSONObject> duplicate() {
    return new HdfsCompressStringWriter();
}

}

1 Ответ

1 голос
/ 07 июня 2019

Я бы порекомендовал реализовать BulkWriter для StreamingFileSink, который сжимает элементы с помощью GZIPOutputStream.Код может выглядеть следующим образом:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);

    final DataStream<Integer> input = env.addSource(new InfinitySource());

    final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
    input.addSink(streamingFileSink);

    env.execute();
}

private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    @Override
    public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
        final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
        return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
    }
}

private static class GzipBulkWriter<T> implements BulkWriter<T> {

    private final GZIPOutputStream gzipOutputStream;
    private final ObjectOutputStream objectOutputStream;

    public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
        this.gzipOutputStream = gzipOutputStream;
        this.objectOutputStream = objectOutputStream;
    }

    @Override
    public void addElement(T t) throws IOException {
        objectOutputStream.writeObject(t);
    }

    @Override
    public void flush() throws IOException {
        objectOutputStream.flush();
    }

    @Override
    public void finish() throws IOException {
        objectOutputStream.flush();
        gzipOutputStream.finish();
    }
}
...