Мне нужно выгрузить данные json в cosmosDB из фрейма данных spark, используя scala и intelliJ.
Я читаю CSV-файл с моего локального компьютера и преобразовываю его в формат JSON. Теперь мне нужно выгрузить эти данные json в коллекцию cosmosDB.
Версия Spark 2.2.0 и версия scala 2.11.8
Ниже приведен код, который я написал в IntelliJ с помощью scala для извлечения файла csv с моего локального компьютера и преобразования его в файл json.
import org.apache.spark.sql.SparkSession
import com.microsoft.azure.cosmosdb.spark.config.Config
object DataLoadConversion {
def main(args: Array[String]): Unit = {
System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
val spark = SparkSession.builder().master("local").appName("DataConversion").getOrCreate()
val df = spark.read.format("com.databricks.spark.csv")
.option("quote", "\"")
.option("escape", "\"")
.option("delimiter", ",")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema","true")
.load("file:///C:/Users/an/Desktop/ct_temp.csv")
val finalDf = df.select(df("history_temp_id").as("NUM"),df("history_temp_time").as("TIME"))
val jsonData = finalDf.select("NUM", "TIME").toJSON
jsonData.show(2)
// COSMOS DB Write configuration
val writeConfig = Config(Map(
"Endpoint" -> "https://cosms.documents.azure.com:443/",
"Masterkey" -> "YOUR-KEY-HERE", //provided primary key
"Database" -> "DBName", //provided with DB name
"Collection" -> "Collection", //provided with collection name
))
// Write to Cosmos DB from the DataFrame
import org.apache.spark.sql.SaveMode
jsonData.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
}
Ниже приведен файл build.sbt
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.databricks" %% "spark-csv" % "1.5.0",
)
libraryDependencies += "com.microsoft.azure" % "azure-cosmosdb-spark_2.2.0_2.11" % "1.1.1" % "provided" exclude("org.apache.spark", "spark-core_2.10")
Добавлена зависимость cosmosDB в файл build.sbt.
Я новичок в Spark и Scala. пожалуйста, дайте мне знать, что нужно сделать, чтобы соединиться с космической БД от intelliJ с помощью spark и scala?
Сборка прошла успешно, но я получаю ошибку ниже при выполнении кода.
19/07/10 16:32:41 INFO DocumentClient: Initializing DocumentClient with serviceEndpoint [https://cosms.documents.azure.com/], ConnectionPolicy [ConnectionPolicy [requestTimeout=60, mediaRequestTimeout=300, connectionMode=Gateway, mediaReadMode=Buffered, maxPoolSize=400, idleConnectionTimeout=60, userAgentSuffix= SparkConnector/2.2.0_2.11-1.1.1, retryOptions=com.microsoft.azure.documentdb.RetryOptions@1ef5cde4, enableEndpointDiscovery=true, preferredLocations=[Japan East]]], ConsistencyLevel [Session]
19/07/10 16:33:03 WARN DocumentClient: Failed to retrieve database account information. org.apache.http.conn.HttpHostConnectException: Connect to cosms.documents.azure.com:443 [cosms.documents.azure.com/13.78.51.35] failed: Connection timed out: connect
......
Exception in thread "main" java.lang.IllegalStateException: Http client execution failed.
at com.microsoft.azure.documentdb.internal.GatewayProxy.performGetRequest(GatewayProxy.java:244)
at com.microsoft.azure.documentdb.internal.GatewayProxy.doRead(GatewayProxy.java:93)