Проблема в том, что вы не закрываете ParquetWriter
.Это необходимо для сброса ожидающих элементов на диск.Вы можете решить эту проблему, определив свой собственный RichSinkFunction
, где вы закрываете ParquetWriter
в методе close
:
class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
var parquetWriter: ParquetWriter[GenericRecord] = null
override def open(parameters: Configuration): Unit = {
parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()
}
override def close(): Unit = {
parquetWriter.close()
}
override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {
parquetWriter.write(value)
}
}