Я написал этот код и запустил в кластере с spark-submit: SUCCESS
Когда я делаю это в лотке в Job (QUE)
[Create Job][https://yadi.sk/i/GyOIRRsv_jVs-Q]
Я улавливаю ошибку NoSuchTableException:
2018-12-04 12:48:19,244 [main] WARN org.apache.hadoop.hive.metastore.ObjectStore - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.1.0-cdh5.15.1
2018-12-04 12:48:19,408 [main] WARN org.apache.hadoop.hive.metastore.ObjectStore - Failed to get database default, returning NoSuchObjectException
Сбой запуска Oozie, основной класс [org.apache.oozie.action.hadoop.SparkMain], исключение main (), null org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Я думаю, что это произошло, потому что мне не нужно подключаться к Hive MetaStore.
Затем я добавил:
hiveContext.setConf("hive.metastore.uris", "thrift://***.***.***.***:9083")
и все-таки перехватил ошибку ...
Я использую:
- Claudera CDN 5.15.1
- Spark 1.6.0
- Scala 2.10.6
- QUE
Код
package ShowCase
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
object TotalCalls {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Program")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.metastore.uris", "thrift://***.***.***.***:9083")
val source = hiveContext.table("cdr.subs_cdr_nrm")
import hiveContext.implicits._
val result = {
source.filter("call_type = 1 or call_type = 2 or call_type = 29 or call_type = 43")
.select(from_unixtime(((floor(unix_timestamp($"start_time") / 900)) * 900), "yyyy-MM-dd HH:mm:ss").as("interval"), $"cellid", $"duration")
.groupBy("interval", "cellid")
.agg(count("*").as("total_calls"))
}
result.write.mode(SaveMode.Overwrite).saveAsTable("cdr.call_result")
sc.stop()
}
}