не удалось найти неявное значение для параметра sparkSession - PullRequest
0 голосов
/ 15 марта 2019

У меня есть блокнот с кодом ниже, который выдает ошибку:

не удалось найти неявное значение для параметра sparkSession

import org.apache.spark.sql.{SparkSession, Row, DataFrame}
import org.apache.spark.ml.clustering.KMeans

def createBalancedDataframe(df:DataFrame, reductionCount:Int)(implicit sparkSession:SparkSession) = {

    val kMeans = new KMeans().setK(reductionCount).setMaxIter(30)
    val kMeansModel = kMeans.fit(df)

    import sparkSession.implicits._
    kMeansModel.clusterCenters.toList.map(v => (v, 0)).toDF("features", "label")
  }

val balancedNonFraudDF = createBalancedDataframe(nonFraudDF, fraudCount.toInt)

Ошибка:

Name: Compile Error
Message: <console>:82: error: could not find implicit value for parameter sparkSession: org.apache.spark.sql.SparkSession
       val balancedNonFraudDF = createBalancedDataframe(nonFraudDF, fraudCount.toInt)
                                                       ^

StackTrace: 

Буду очень признателен, если кто-нибудь сможет предложить какую-либо помощь, заранее большое спасибо.

UPDATE:

Благодаря вводу Редди, после того, как я изменил его на

val balancedNonFraudDF = createBalancedDataframe(nonFraudDF, fraudCount.toInt)(spark)

Я получаю следующую ошибку:

Name: java.lang.IllegalArgumentException
Message: Field "features" does not exist.
Available fields: cc_num, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long, distance, age, is_fraud
StackTrace: Available fields: cc_num, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long, distance, age, is_fraud
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
  at org.apache.spark.ml.clustering.KMeansParams$class.validateAndTransformSchema(KMeans.scala:93)
  at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:254)
  at org.apache.spark.ml.clustering.KMeans.transformSchema(KMeans.scala:340)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:305)
  at createBalancedDataframe(<console>:45)

UPDATE2:

featureDF.printSchema
root
 |-- cc_num: long (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- amt: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- category_indexed: double (nullable = false)
 |-- category_encoded: vector (nullable = true)
 |-- merchant_indexed: double (nullable = false)
 |-- merchant_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)

val fraudDF = featureDF
      .filter($"is_fraud" === 1)
      .withColumnRenamed("is_fraud", "label")
      .select("features", "label")

fraudDF.printSchema
root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_time: string (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- amt: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_fraud: integer (nullable = true)

Почему функция исчезла?

enter image description here

1 Ответ

0 голосов
/ 15 марта 2019

Предполагая, что у вас есть SparkSession и он называется spark

, вы можете явно передать его таким образом

val balancedNonFraudDF = createBalancedDataframe(nonFraudDF, fraudCount.toInt)(spark)

или создать неявную ссылку (spark2 или любое имя) в вызывающей среде.Пример:

implicit val spark2 = spark
//some calls
// others
val balancedNonFraudDF = createBalancedDataframe(nonFraudDF, fraudCount.toInt)
...