flink сток в файл паркета с AvroParquetWriter не записывает данные в файл - PullRequest
0 голосов
/ 29 ноября 2018

Я пытаюсь записать файл паркета как сток, используя AvroParquetWriter.Файл создан, но имеет длину 0 (данные не записываются).Я делаю что-то неправильно ?не мог понять в чем проблема

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"/tmp/test-$now.parquet")
  val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(schema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()

  writer.write(genericReocrd)
  stream.addSink{r =>
    writer.write(r)
  }
  env.execute()

1 Ответ

0 голосов
/ 29 ноября 2018

Проблема в том, что вы не закрываете ParquetWriter.Это необходимо для сброса ожидающих элементов на диск.Вы можете решить эту проблему, определив свой собственный RichSinkFunction, где вы закрываете ParquetWriter в методе close:

class ParquetWriterSink(val path: String, val schema: String, val compressionCodecName: CompressionCodecName, val config: ParquetWriterConfig) extends RichSinkFunction[GenericRecord] {
  var parquetWriter: ParquetWriter[GenericRecord] = null

  override def open(parameters: Configuration): Unit = {
    parquetWriter = AvroParquetWriter.builder[GenericRecord](new Path(path))
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
      .withValidation(config.validating)
      .build()
  }

  override def close(): Unit = {
    parquetWriter.close()
  }

  override def invoke(value: GenericRecord, context: SinkFunction.Context[_]): Unit = {
    parquetWriter.write(value)
  }
}
...