У меня есть метод, который записывает один из моих классов Foo
, который определяется как Thrift, в форме паркет.
import Foo
import org.apache.spark.rdd.RDD
import org.apache.thrift.TBase
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.thrift.ParquetThriftOutputFormat
def writeThriftParquet(rdd: RDD[Foo], outputPath: String): Unit = {
val job = Job.getInstance()
ParquetThriftOutputFormat.setThriftClass(job, classOf[Foo])
ParquetOutputFormat.setWriteSupportClass(job, classOf[Foo])
rdd
.map(x => (null, x))
.saveAsNewAPIHadoopFile(
outputPath,
classOf[Void],
classOf[Foo],
classOf[ParquetThriftOutputFormat[Foo]],
job.getConfiguration)
}
Это работает нормально, но я бы предпочел написать более обобщенный c метод. Я попытался (относительно) просто:
def writeThriftParquetGeneral[A <: TBase[_, _]](rdd: RDD[A], outputPath: String): Unit = {
val job = Job.getInstance()
ParquetThriftOutputFormat.setThriftClass(job, classOf[A])
ParquetOutputFormat.setWriteSupportClass(job, classOf[A])
rdd
.map(x => (null, x))
.saveAsNewAPIHadoopFile(
outputPath,
classOf[Void],
classOf[A],
classOf[ParquetThriftOutputFormat[A]],
job.getConfiguration)
}
, но это не с ошибками вроде:
class type required but A found ParquetThriftOutputFormat.setThriftClass(job, classOf[A])
class type required but A found ParquetOutputFormat.setWriteSupportClass(job, classOf[A])
Чтобы попытаться исправить это, я использовал ClassTag
, но убежище не получается что-то скомпилировать.
import scala.reflect._
implicit val ct = ClassTag[Foo](classOf[Foo])
def writeThriftParquetGeneral[A <: TBase[_, _]](rdd: RDD[A], outputPath: String)(
implicit tag: ClassTag[A]): Unit = {
val job = Job.getInstance()
// The problem line
ParquetThriftOutputFormat.setThriftClass(job, tag.runtimeClass)
// Seems OK from here
ParquetOutputFormat.setWriteSupportClass(job, tag.runtimeClass)
rdd
.map(x => (null, x))
.saveAsNewAPIHadoopFile(
outputPath,
classOf[Void],
tag.runtimeClass,
classOf[ParquetThriftOutputFormat[A]],
job.getConfiguration)
}
В строке не получается: ParquetThriftOutputFormat.setThriftClass(job, tag.runtimeClass)
[error] found : Class[_$1] where type _$1
[error] required: Class[_ <: org.apache.thrift.TBase[_, _]]
Я удивлен, что компилятор (Scala 2.11) не распознает что tag.runtimeClass
должно быть classOf[A]
, а A
удовлетворяет типу, ограниченному определением.