Ниже приведен код и ошибка сборки, которую я вижу. Можете ли вы сказать, как я могу решить эту ошибку. Это полный код. URL были опущены. Используемая версия Spark - 1.6.0 Scala Используемая версия: 2.10.5
Код
import java.net.{HttpURLConnection, URL}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Arrays
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType._
import scala.Predef.exceptionWrapper
object CoExtract {
private val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]) {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val sqlcontext = new SQLContext(sc)
// val sqlcontext = new HiveContext(sc)
import sqlcontext.implicits._
val obj = new Connection
val string_response=obj.getCsvResponse(obj.getConnection(new URL("https://")))
val array_response = string_response.split("\n")
logger.info("The length of the array is "+ array_response.length)
val rdd_response=sc.parallelize(array_response.toSeq)
logger.info("The count of elements in the rdd are "+rdd_response.count())
val header = rdd_response.first()
logger.info("header"+header)
val noheaderRDD = rdd_response.filter(_ != header)
logger.info("NoheaderRDD is"+noheaderRDD.first())
val subsetRdd=noheaderRDD.map( x => (Row(
x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",-1)(0),
x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",-1)(1),
x.split(",")(2)
)
)
)
val x =subsetRdd.zipWithIndex().collect()
val schema = new StructType()
.add(StructField("Email",StringType, true))
.add(StructField("Recipient",StringType, true))
.add(StructField("rowid", LongType, false))
val rdd_to_df = sqlcontext.createDataFrame(subsetRdd,schema)
val df_to_rdd_again = rdd_to_df.rdd.zipWithIndex
rdd_to_df.withColumn("rowid", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
val final_df = sqlcontext.createDataFrame(df_to_rdd_again.map{case (row,index) => Row.fromSeq(row.toSeq ++ Seq(index))}, schema )
val start: Int = 0
val end: Int = rdd_to_df.count().toInt
var counter:Int = start
logger.info("Final Count" + end)
logger.info("The schema of the dataframe is "+rdd_to_df.printSchema())
final_df.show(100,false)
logger.info("Schema of rdd to df" + rdd_to_df.printSchema())
logger.info("schema of final_df" + final_df.printSchema())
val df_response = sqlcontext.read.format("com.databricks.spark.csv").option("header", "true").load("hdfs:///")
logger.info («Схема фрейма данных» + df_response) logger.info («Количество фреймов данных» + df_response.count ())
}}
Ошибка сборки
scala:48: error: type mismatch;
[ERROR] found : Array[String]
[ERROR] required: Seq[?]
[ERROR] Error occurred in an application involving default arguments.
[INFO] val rdd_response=sc.parallelize(array_response)
[INFO] ^