Flink ParquetSinkWriter FileAlreadyExistsException - PullRequest
0 голосов
/ 07 февраля 2020

Я пытаюсь использовать Apache Flink записать файл паркета в HDFS с помощью BucketingSink и пользовательского ParquetSinkWriter.

Здесь приведен код ошибки и выше, когда включается контрольная точка (вызов snapshotState () в BucketingSink Класс) грипп sh метод снизу не работает тихо. Даже писатель закрыт с "writer.close ();" но все равно получил ошибку от "writer = createWriter ();". Какие-нибудь мысли? спасибо

Получена ошибка, подобная этой:

org. apache .had oop .fs.FileAlreadyExistsException: / user / hive / flink_parquet_fils_with_checkingpoint / year = 20 / month = 2 /day=1/hour=17/_part-4-9.in-progress для клиента 192.168.56.202 уже существует в org. apache .had oop .hdfs.server.namenode.FSNamesystem.startFileInternal (FSNamesystem. java: 3003) в орг. apache .had oop .hdfs.server.namenode.FSNamesystem.startFileInt (FSNamesystem. java: 2890)

..... at flink.untils.ParquetSinkWriter.flu sh (ParquetSinkWriterForecast. java: 81) в org. apache .flink.streaming.connectors.fs.bucketing.BucketingSink.snapshotState (BucketingSink. java: 749)

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * Parquet writer.
 *
 * @param <T>
 */
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

    private static final long serialVersionUID = -975302556515811398L;

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private final int pageSize = 64 * 1024;

    private final String schemaRepresentation;

    private transient Schema schema;
    private transient ParquetWriter<GenericRecord> writer;
    private transient Path path;

    private int position;

    public ParquetSinkWriter(String schemaRepresentation) {
        this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        this.position = 0;
        this.path = path;

        if (writer != null) {
            writer.close();
        }

        writer = createWriter();
    }

    @Override
    public long flush() throws IOException {
        Preconditions.checkNotNull(writer);
        position += writer.getDataSize();
        writer.close();
        writer = createWriter();

        return position;
    }

    @Override
    public long getPos() throws IOException {
        Preconditions.checkNotNull(writer);
        return position + writer.getDataSize();
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }

    @Override
    public void write(T element) throws IOException {
        Preconditions.checkNotNull(writer);
        writer.write(element);
    }

    @Override
    public Writer<T> duplicate() {
        return new ParquetSinkWriter<>(schemaRepresentation);
    }

    private ParquetWriter<GenericRecord> createWriter() throws IOException {
        if (schema == null) {
            schema = new Schema.Parser().parse(schemaRepresentation);
        }

        return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .build();
    }
}


1 Ответ

0 голосов
/ 08 февраля 2020

Кажется, что файл, который вы пытаетесь создать, в настоящее время существует. Это потому, что вы используете режим записи по умолчанию CREATE, который не работает, когда файл существует. Вы можете попытаться изменить свой код для использования режима OVERWRITE. Вы можете изменить метод createWriter(), чтобы он выглядел примерно так:

return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
            .build();

...