Невозможно прочитать файл паркета с вложенными записями, используя ParquetAvro - PullRequest
0 голосов
/ 30 августа 2018

Я пытаюсь прочитать файл паркета в Scala, используя интерфейс Avro (1.10.). Файл также был создан с использованием того же интерфейса.

Данные, которые я пишу, выглядят так:

case class Inner(b: Array[Int])

case class Outer(a: Array[Inner])


val data = Outer(
  Array(
    Inner(Array(1, 2)),
    Inner(Array(3, 4))
  )
)

Использование parquet-tools для чтения прочитанного файла выглядит так:

$parquet-tools cat /tmp/test.parquet    
a:
.array:
..b:
...array = 1
...array = 2
.array:
..b:
...array = 3
...array = 4

Но при попытке прочитать файл я получаю следующее исключение:

Exception in thread "main" org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'array' not found
    at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
    at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
    at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
    at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
    at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
    at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
    at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
    at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
    at raw.runtime.writer.parquet.avro.Lixo$.main(Lixo.scala:78)
    at raw.runtime.writer.parquet.avro.Lixo.main(Lixo.scala)

Этот код используется для генерации файла:

val filename = "/tmp/test.parquet"
val path = Paths.get(filename).toFile
val conf = new Configuration()

val schema: Schema = {

  val inner = Schema.createRecord("inner", "some doc", "outer", false,
    List(new Schema.Field("b", Schema.createArray(Schema.create(Schema.Type.INT)), "", null: Object)).asJava
  )

  Schema.createRecord("outer", "", "", false,
    List(new Schema.Field("a", Schema.createArray(inner), "", null: Object)).asJava
  )
}

val os = new FileOutputStream(path)

val outputFile = new RawParquetOutputFile(os)
val parquetWriter: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](outputFile)
  .withConf(conf)
  .withSchema(schema)
  .build()

val data = Outer(
  Array(
    Inner(Array(1, 2)),
    Inner(Array(3, 4))
  )
)

val record = new GenericData.Record(schema)
val fieldA = schema.getField("a").schema()
val recorData = {
  val fieldAType = fieldA.getElementType()
  data.a.map { x =>
    val innerRecord = new GenericData.Record(fieldAType)
    innerRecord.put("b", x.b)
    innerRecord
  }
}

record.put("a", recorData)
parquetWriter.write(record)
parquetWriter.close()
os.close()

Я что-то не так делаю?

...