Мы хотим записать сжатые данные в HDFS с помощью Flink's BucketingSink или StreamingFileSink.Я написал свой собственный писатель, который работает нормально, если не происходит сбой.Однако, когда он обнаруживает сбой и перезапускается с контрольной точки, он генерирует файл действительной длины (hadoop <2.7) или усекает файл.К несчастью, gzips - это двоичные файлы, у которых в конце файла есть трейлер.Поэтому простое усечение не работает в моем случае.Любые идеи, чтобы включить точно один раз семантику для сжатия hdfs раковина? </p>
Это код моего автора:
public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {
private static final long serialVersionUID = 2L;
/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "\n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}
}