ошибка: путь не существует в искровой отправке с помощью hadoop - PullRequest
0 голосов
/ 07 июня 2018

Мы используем команду /home/ubuntu/spark/bin/spark-submit --master yarn --deploy-mode cluster --class "SimpleApp" /home/ubuntu/spark/examples/src/main/scala/sbt/target/scala-2.11/teste_2.11-1.0.jar для запуска сценария ниже

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark
import org.apache.spark.sql
import org.apache.spark.SparkContext._


object SimpleApp {
     def main(args: Array[String]) {

     val spark = SparkSession.builder().appName("query1").master("yarn").getOrCreate
     val header = StructType(Array(
             StructField("medallion", StringType, true),
             StructField("hack_license", StringType, true),
             StructField("vendor_id", StringType, true),
             StructField("rate_code", IntegerType, true),
             StructField("store_and_fwd_flag", StringType, true),
             StructField("pickup_datetime", TimestampType, true),
             StructField("dropoff_datetime", TimestampType, true),
             StructField("passenger_count", IntegerType, true),
             StructField("trip_time_in_secs", IntegerType, true),
             StructField("trip_distance", FloatType, true),
             StructField("pickup_longitude", FloatType, true),
             StructField("pickup_latitude", FloatType, true),
             StructField("dropoff_longitude", FloatType, true),
             StructField("dropoff_latitude", FloatType, true),
             StructField("payment_type", StringType, true),
             StructField("fare_amount", FloatType, true),
             StructField("surcharge", FloatType, true),
             StructField("mta_tax", FloatType, true),
             StructField("trip_amount", FloatType, true),
             StructField("tolls_amount", FloatType, true),
             StructField("total_amount", FloatType, true),
             StructField("zone", StringType, true)))

     val nyct = spark.read.format("csv").option("delimiter", ",").option("header", "true").schema(header).load("/home/ubuntu/trip_data/trip_data_fare_1.csv")
     nyct.createOrReplaceTempView("nyct_temp_table")

     spark.time(spark.sql("""SELECT zone, COUNT(*) AS accesses FROM nyct_temp_table WHERE (HOUR(dropoff_datetime) >= 8 AND HOUR(dropoff_datetime) <= 19) GROUP BY zone ORDER BY accesses DESC""").show())

     }
 }

Идея состоит в том, чтобы запустить запрос в сценарии в кластер с помощью spark и Hadoop.Но в конце выполнения это генерирует ошибку, чтобы зарезервировать CSV-файл из пути /home/ubuntu/trip_data/trip_data_fare_1.csv. Это изображение ошибки

Я думаю, что проблема в том, что подчиненный узел не может найти файл в главном каталоге.Кто-то знает, как я могу это исправить и запустить этот скрипт в кластере?

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Если я не ошибаюсь, то Spark рассматривает вашу локальную файловую систему в качестве файловой системы по умолчанию, поэтому вы сталкиваетесь с этой ошибкой. Конфигурации должны быть переданы в контекст Spark, и вы должны упомянуть HADOOP_CONF_DIR в spark-env.shфайл во всех узлах. Убедитесь, что HADOOP_CONF_DIR указан во всех узлах

val spCont = <Spark Context>
val config = spCont.hadoopConfiguration

config.addResource(new Path(s"${HADOOP_HOME}<path to core-site.xml>"))
0 голосов
/ 07 июня 2018

Поскольку вы работаете в кластере, у вас должен быть этот файл в формате hdfs.Вы можете скопировать файлы из локальной файловой системы в HDFS, используя следующую команду:

hadoop fs -put source_path dest_path

Затем используйте dest_path в своем коде.

Для вас это нужно сделать на хосте с локальнымfile:

hadoop fs -put /home/ubuntu/trip_data/trip_data_fare_1.csv <some_hdfs_location>

Убедитесь, что копия сработала, выполнив следующие действия:

hdfs dfs -ls <some_hdfs_location>
...