Ошибка несоответствия типов. found Array [String] требует Seq [?] в Scala Spark - PullRequest
0 голосов
/ 06 января 2020

Ниже приведен код и ошибка сборки, которую я вижу. Можете ли вы сказать, как я могу решить эту ошибку. Это полный код. URL были опущены. Используемая версия Spark - 1.6.0 Scala Используемая версия: 2.10.5

Код

import java.net.{HttpURLConnection, URL}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Arrays
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType._
import scala.Predef.exceptionWrapper

object CoExtract {

    private val logger = LoggerFactory.getLogger(getClass)
    def main(args: Array[String]) {

    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val sqlcontext = new SQLContext(sc)
//  val sqlcontext = new HiveContext(sc)

    import sqlcontext.implicits._

    val obj = new Connection
    val string_response=obj.getCsvResponse(obj.getConnection(new URL("https://")))

    val array_response = string_response.split("\n")
    logger.info("The length of the array is "+ array_response.length)
    val rdd_response=sc.parallelize(array_response.toSeq)

    logger.info("The count of elements in the rdd are "+rdd_response.count())

val header = rdd_response.first()
logger.info("header"+header)

val noheaderRDD = rdd_response.filter(_ != header)
logger.info("NoheaderRDD is"+noheaderRDD.first())

val subsetRdd=noheaderRDD.map( x => (Row(
  x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",-1)(0),
  x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",-1)(1),
  x.split(",")(2)

)
  )
)

val x =subsetRdd.zipWithIndex().collect()

val schema = new StructType()
  .add(StructField("Email",StringType, true))
  .add(StructField("Recipient",StringType, true))
  .add(StructField("rowid", LongType, false))


val rdd_to_df = sqlcontext.createDataFrame(subsetRdd,schema)
val df_to_rdd_again = rdd_to_df.rdd.zipWithIndex

rdd_to_df.withColumn("rowid", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
val final_df = sqlcontext.createDataFrame(df_to_rdd_again.map{case (row,index) => Row.fromSeq(row.toSeq ++ Seq(index))}, schema )

val start: Int = 0
val end: Int = rdd_to_df.count().toInt
var counter:Int = start



 logger.info("Final Count" + end)
    logger.info("The schema of the dataframe is "+rdd_to_df.printSchema())
final_df.show(100,false)

logger.info("Schema of rdd to df" + rdd_to_df.printSchema())
logger.info("schema of final_df" + final_df.printSchema())

val df_response = sqlcontext.read.format("com.databricks.spark.csv").option("header", "true").load("hdfs:///")

logger.info («Схема фрейма данных» + df_response) logger.info («Количество фреймов данных» + df_response.count ())

}}

Ошибка сборки

scala:48: error: type mismatch;
[ERROR]  found   : Array[String]
[ERROR]  required: Seq[?]
[ERROR] Error occurred in an application involving default arguments.
[INFO]     val rdd_response=sc.parallelize(array_response)
[INFO]                                     ^

1 Ответ

0 голосов
/ 06 января 2020

Просто конвертируйте Array в Seq:

val rdd_response=sc.parallelize(array_response.toSeq)
...