Невозможно прочитать файл из хранилища BLOB-объектов Azure из Connect Apache Spark от Databrick - PullRequest
0 голосов
/ 11 ноября 2019

Я настроил подключение к базе данных в Azure для запуска моих искровых программ в облаке Azure. Для пробного запуска я проверил программу WordCount. Но программа завершается с ошибкой:

"Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:"

Я использую Intellij для запуска программы. У меня есть необходимые разрешения для доступа к кластеру. Но я все еще получаю эту ошибку.

Следующая программа представляет собой обертку, которая принимает параметры и публикует результаты.

package com.spark.scala
import com.spark.scala.demo.{Argument, WordCount}
import org.apache.spark.sql.SparkSession

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

import scala.collection.mutable.Map

object Test {
  def main(args: Array[String]): Unit = {
    val argumentMap: Map[String, String] = Argument.parseArgs(args)
    val spark = SparkSession
      .builder()
      .master("local")

      .getOrCreate()
    println(spark.range(100).count())


    val rawread = String.format("/mnt/%s", argumentMap.get("--raw-reads").get)
    val data = spark.sparkContext.textFile(rawread)

    print(data.count())

    val rawwrite = String.format("/dbfs/mnt/%s", argumentMap.get("--raw-write").get)
    WordCount.executeWordCount(spark, rawread, rawwrite);
    // The Spark code will execute on the Databricks cluster.
    spark.stop()
  }
}

Следующий код выполняет логику подсчета слов: -

package com.spark.scala.demo


import org.apache.spark.sql.SparkSession

object WordCount{

  def executeWordCount(sparkSession:SparkSession, read: String, write: String)
  {
    println("starting word count process ")


    //val path = String.format("/mnt/%s", "tejatest\wordcount.txt")

    //Reading input file and creating rdd with no of partitions 5
    val bookRDD=sparkSession.sparkContext.textFile(read)

    //Regex to clean text
    val pat = """[^\w\s\$]"""
    val cleanBookRDD=bookRDD.map(line=>line.replaceAll(pat, ""))

    val wordsRDD=cleanBookRDD.flatMap(line=>line.split(" "))

    val wordMapRDD=wordsRDD.map(word=>(word->1))

    val wordCountMapRDD=wordMapRDD.reduceByKey(_+_)



    wordCountMapRDD.saveAsTextFile(write)



  }
}

Я написал картограф для сопоставления указанных путей, и я передаю места для чтения и записи через командную строку. Мой pom.xml выглядит следующим образом: -

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ex-com.spark.scala</groupId>
    <artifactId>ex- demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.clapper</groupId>
            <artifactId>grizzled-slf4j_2.11</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>dbutils-api_2.11</artifactId>
            <version>0.0.3</version>
        </dependency>
        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>


</project>
...