Выгрузить данные Json в CosmosDB из Dataframe с помощью intelliJ - PullRequest
0 голосов
/ 09 июля 2019

Мне нужно выгрузить данные json в cosmosDB из фрейма данных spark, используя scala и intelliJ. Я читаю CSV-файл с моего локального компьютера и преобразовываю его в формат JSON. Теперь мне нужно выгрузить эти данные json в коллекцию cosmosDB.

Версия Spark 2.2.0 и версия scala 2.11.8

Ниже приведен код, который я написал в IntelliJ с помощью scala для извлечения файла csv с моего локального компьютера и преобразования его в файл json.

import org.apache.spark.sql.SparkSession
import com.microsoft.azure.cosmosdb.spark.config.Config 

object DataLoadConversion {
def main(args: Array[String]): Unit = {

    System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
    val spark = SparkSession.builder().master("local").appName("DataConversion").getOrCreate()

    val df = spark.read.format("com.databricks.spark.csv")
      .option("quote", "\"")
      .option("escape", "\"")
      .option("delimiter", ",")
      .option("header", "true")
      .option("mode", "FAILFAST")
      .option("inferSchema","true")
      .load("file:///C:/Users/an/Desktop/ct_temp.csv")

    val finalDf = df.select(df("history_temp_id").as("NUM"),df("history_temp_time").as("TIME"))

    val jsonData = finalDf.select("NUM", "TIME").toJSON
    jsonData.show(2)


    // COSMOS DB Write configuration

    val writeConfig = Config(Map(
      "Endpoint" -> "https://cosms.documents.azure.com:443/",
      "Masterkey" -> "YOUR-KEY-HERE",  //provided primary key
      "Database" -> "DBName", //provided with DB name
      "Collection" -> "Collection", //provided with collection name
      ))

    // Write to Cosmos DB from the DataFrame
    import org.apache.spark.sql.SaveMode
    jsonData.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

  }

Ниже приведен файл build.sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "com.databricks" %% "spark-csv" % "1.5.0",

)
libraryDependencies += "com.microsoft.azure" % "azure-cosmosdb-spark_2.2.0_2.11" % "1.1.1" % "provided" exclude("org.apache.spark", "spark-core_2.10")

Добавлена ​​зависимость cosmosDB в файл build.sbt.

Я новичок в Spark и Scala. пожалуйста, дайте мне знать, что нужно сделать, чтобы соединиться с космической БД от intelliJ с помощью spark и scala?

Сборка прошла успешно, но я получаю ошибку ниже при выполнении кода.

19/07/10 16:32:41 INFO DocumentClient: Initializing DocumentClient with serviceEndpoint [https://cosms.documents.azure.com/], ConnectionPolicy [ConnectionPolicy [requestTimeout=60, mediaRequestTimeout=300, connectionMode=Gateway, mediaReadMode=Buffered, maxPoolSize=400, idleConnectionTimeout=60, userAgentSuffix= SparkConnector/2.2.0_2.11-1.1.1, retryOptions=com.microsoft.azure.documentdb.RetryOptions@1ef5cde4, enableEndpointDiscovery=true, preferredLocations=[Japan East]]], ConsistencyLevel [Session]
19/07/10 16:33:03 WARN DocumentClient: Failed to retrieve database account information. org.apache.http.conn.HttpHostConnectException: Connect to cosms.documents.azure.com:443 [cosms.documents.azure.com/13.78.51.35] failed: Connection timed out: connect
......
Exception in thread "main" java.lang.IllegalStateException: Http client execution failed.
    at com.microsoft.azure.documentdb.internal.GatewayProxy.performGetRequest(GatewayProxy.java:244)
    at com.microsoft.azure.documentdb.internal.GatewayProxy.doRead(GatewayProxy.java:93)

Ответы [ 2 ]

0 голосов
/ 11 июля 2019

Ниже приведены файл build.sbt и объектный файл, используемые для чтения csv-файла с локального компьютера, преобразования его в json, подключения и выгрузки данных в базу данных cosmos.

ниже приведен файл build.sbt:

name := "Test"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "com.databricks" %% "spark-csv" % "1.5.0"
)
libraryDependencies += "joda-time" % "joda-time" % "2.10.3"
libraryDependencies += "com.fasterxml.uuid" % "java-uuid-generator" % "3.1.4"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.3"
libraryDependencies += "org.json" % "json" % "20180813"
libraryDependencies += "com.google.guava" % "failureaccess" % "1.0.1"
libraryDependencies += "org.checkerframework.annotatedlib" % "guava" % "28.0-jre"
excludeDependencies += "com.google.guava" % "guava"

Ниже находится объектный файл:


    package hive.dataload

    import org.apache.spark.sql.{SparkSession,SaveMode}
    import com. microsoft.azure.cosmosdb.spark.schema._
    import com.microsoft.azure.cosmosdb.spark.config.Config


    object DataLoadConversion {


      def main(args: Array[String]): Unit = {

        System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
        val spark = 
    SparkSession.builder().master("local").appName("DataConversion").getOrCreate()

        val df = spark.read.format("com.databricks.spark.csv")
          .option("quote", "\"")
          .option("escape", "\"")
          .option("delimiter", ",")
          .option("header", "true")
          .option("mode", "FAILFAST")
          .option("inferSchema","true")
          .load("file:///C:/Users/an/Desktop/ct_temp.csv")

        df.show(5)


        val finalDf = 
     df.select(df("history_temp_id").as("NUM"),df("history_temp_time").as("TIME"))

        finalDf.show(3)

        val jsonData = finalDf.select("NUM", "TIME").toJSON
        jsonData.show()


        // COSMOS DB Write configuration
        val writeConfig = Map(
          "Endpoint" -> "https://cosms.documents.azure.com:443/",
          "Masterkey" -> "MASTER_KEY",
          "Database" -> "DB_NAME",
          "Collection" -> "COLLECTION_NAME",
          "preferredRegions" -> "REGION",
          "Upsert" -> "true"
        )
        val config = Config(writeConfig)

        // Write to Cosmos DB from the DataFrame
        jsonData.write.mode(SaveMode.Overwrite).cosmosDB(config)

        spark.stop()

      }
    }

0 голосов
/ 11 июля 2019

Вышеуказанная ошибка произошла из-за настроек прокси. Если вы подключаетесь из своей личной сети / сети без настроек прокси-сервера, ошибка тайм-аута этого подключения не будет получена.

...