Ошибка преобразования искры Scala при создании фрейма данных - PullRequest
0 голосов
/ 24 сентября 2018

Я новичок в скале.Пожалуйста, наберитесь терпения.

У меня есть этот код.

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// create spark session
implicit val spark = SparkSession.builder().appName("clustering").getOrCreate()

// read file
val fileName = """file:///some_location/head_sessions_sample.csv"""

// create DF from file
val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(fileName)

def inputKmeans(df: DataFrame,spark: SparkSession): DataFrame = {
    try {
      val a = df.select("id", "start_ts", "duration", "ip_dist").map(r => (r.getInt(0), Vectors.dense(r.getDouble(1), r.getDouble(2), r.getDouble(3)))).toDF("id", "features")
      a
    }
    catch {
      case e: java.lang.ClassCastException => spark.emptyDataFrame
   }
}


val t = inputKmeans(df).filter( _ != null )
t.foreach(r =>
            if (r.get(0) != null)
              println(r.get(0)))

На данный момент я хочу игнорировать ошибки конвертации.Но почему-то они у меня все еще есть.

2018-09-24 11:26:22 ОШИБКА Исполнитель: 91 - Исключение в задаче 0.0 на этапе 4.0 (TID 6) java.lang.ClassCastException: java.lang.Long не может быть приведен к java.lang.Double

Я не думаю, что есть смысл давать снимок csv.На данный момент, я просто хочу игнорировать ошибки преобразования.Есть идеи, почему это происходит?

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

я думаю, что обнаружил проблему.«try catch» размещается на уровне создания DF, а не на уровне конверсии.следовательно, он улавливает проблемы, связанные с созданием DF, а не проблемы преобразования.

0 голосов
/ 24 сентября 2018

Как упоминалось в комментарии, проблема заключается в том, что значения не относятся к типу Double.

val a = df.select("id", "start_ts", "duration", "ip_dist").map(r => (r.getInt(0), Vectors.dense(r.getDouble(1), r.getDouble(2), r.getDouble(3)))).toDF("id", "features")

Либо приведен к типу Correct DataType, то есть к типу Long (вы также можете указатьСхема явно использует Case Class и применяет схему к DataFrame).

Или используйте VectorAssembler для преобразования столбцов в объекты.Это более простой и рекомендуемый подход.

import org.apache.spark.ml.feature.VectorAssembler 
def inputKmeans(df: DataFrame,spark: SparkSession): DataFrame = {
  val assembler = new VectorAssembler().setInputCols(Array("start_ts", "duration", "ip_dist")).setOutputCol("features")
  val output = assembler.transform(df).select("id", "features")
  output
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...