Здесь нет никакого вывода вообще.Вместо этого вы получаете более или менее то, что вы просите.В частности, ошибка здесь:
override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
Encoders.kryo
означает, что вы применяете сериализацию общего назначения и возвращаете двоичный двоичный объект.Вводящая в заблуждение часть - .as[Map[String, Int]]
- вопреки тому, что можно ожидать, она не проверяется статически.Что еще хуже, он даже не был предварительно проверен планировщиком запросов, и исключение времени выполнения - throw, только когда result
вычисляется.
result.first
org.apache.spark.sql.AnalysisException: cannot resolve '`my_sum`' due to data type mismatch: cannot cast binary to map<string,int>;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:115)
...
Вы должны предоставить конкретные Encoder
вместо этого либо явно :
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
def outputEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
, либо неявно
class NoopAgg[I](implicit val enc: Encoder[Map[String, Int]]) extends Aggregator[I, Map[String, Int], Map[String, Int]] {
...
override def outputEncoder: Encoder[Map[String, Int]] = enc
}
В качестве побочного эффекта это сделает as[Map[String, Int]]
устаревшим, как тип возвращаемого значенияAggregator
уже известно.