Scala: запись данных в корзину S3 - PullRequest
0 голосов
/ 18 ноября 2018

Я пытаюсь записать данные в корзину S3, но я получаю ошибки ниже.

SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
18/11/18 23:32:14 ERROR Utils: Aborting task
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/11/18 23:32:14 WARN FileOutputCommitter: Could not delete s3a://Accesskey:SecretKey@test-bucket/Output/Check1Result/_temporary/0/_temporary/attempt_20181118233210_0004_m_000000_0
18/11/18 23:32:14 ERROR FileFormatWriter: Job job_20181118233210_0004 aborted.
18/11/18 23:32:14 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 209)
org.apache.spark.SparkException: Task failed while writing rows.

Я попробовал приведенный ниже код и могу записать данные в локальную файловую систему. Но когда я пытаюсь записать данные в корзину S3, я получаю ошибки выше.

Мой код:

package Spark_package

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object dataload {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[*]").appName("dataload").config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2").getOrCreate()
    val sc = spark.sparkContext
    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

    val conf = new SparkConf().setAppName("dataload").setMaster("local[*]").set("spark.speculation","false")
    val sqlContext = spark.sqlContext


    val data = "C:\\docs\\Input_Market.csv"
    val ddf = spark.read.format("csv").option("inferSchema","true").option("header","true").option("delimiter",",").load(data)
    ddf.createOrReplaceTempView("data")
    val res = spark.sql("select count(*),cust_id,sum_cnt from data group by cust_id,sum_cnt")
    res.write.option("header","true").format("csv").save("s3a://Accesskey:SecretKey@test-bucket/Output/Check1Result1")


    spark.stop()
  }
}
...