Я пытаюсь запустить пример приложения локально, используя: Scala (2.11), Spark (2.3.0) с API-интерфейсом streamset версии 3.8.0.
(я пытаюсь запустить преобразование искры, как описанов этом уроке: https://github.com/streamsets/tutorials/blob/master/tutorial-spark-transformer-scala/readme.md)
Сначала я создаю JavaRDD [запись], что-то вроде:
val testrecord = spark.read.json("...path to json file").toJavaRDD.asInstanceOf[JavaRDD[Record]]
Затем я передаю эту JavaRDD [запись] в метод преобразованияв классе DTStream:
new DTStream().transform(testrecord)
Метод Transform в самом классе DTStream очень прост:
@override def transform(javaRDD: JavaRDD[Record]): TransformResult = {
val recordRDD = javaRDD.rdd
val resultMessage = recordRDD.map((record) => record) //Just trying to pass incoming record as outgoing record - no transformation at all.
new TransformResult (resultMessage.toJavaRDD, error) // where error is already defined as a JavaPairRDD.
}
Когда я пробую этот простой код, я получаю следующее исключение именно вэта строка:
val resultMessage = recordRDD.map((record) => record)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to com.streamsets.pipeline.api.Record.
Есть ли какие-либо указания относительно того, почему я могу получить это и как решить?Заранее спасибо.
Примечание: запись - api / datacollector-api / запись: https://github.com/streamsets/datacollector-api/blob/master/src/main/java/com/streamsets/pipeline/api/Record.java