Запуск искры в oozie с sqlContext - PullRequest
0 голосов
/ 20 июня 2019

Я пытаюсь запустить искорку в рабочем процессе Oozie.он работает нормально, пока я не пытаюсь получить доступ к внешней таблице улья через искру.Мой рабочий процесс:

<action name="SparkJob">
    <spark xmlns="uri:oozie:spark-action:0.1">
        <job-tracker>${jobtracker}</job-tracker>
        <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>yarn-cluster</master>
            <name>name</name>
            <class>classname</class>
            <jar>jar file</jar>
            <spark-opts>--conf spark.yarn.queue=${queuename_nonp} --conf spark.ui.port=5050</spark-opts>
    </spark>
    <ok to="EMAIL_SUCCESS"/>
    <error to="EMAIL_FAILURE"/>
</action>

После сбоя задания Oozie я проверил журнал пряжи с помощью идентификатора приложения, указанного в выходных данных Oozie.упомянутая ошибка -


2019-06-20 17:14:56,602 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster  - User class threw exception: java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier use found

use instance_name
^
java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier use found

use instance_name.
^

Код, на который ссылается эта ошибка, находится ниже искровых запросов.Я использую эти искровые запросы в качестве jar в рабочем процессе.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.{from_unixtime, regexp_replace}
import org.apache.spark.sql.DataFrameNaFunctions

object loadHiveTable {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark-transformation-001")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val df = sqlContext.read.format("com.databricks.spark.avro").load("avro_file")
    val temptable = "temp_emp"
    df4.registerTempTable(temptable)
    val tempquery = "select * from temp_emp"
    val result = sqlContext.sql(tempquery)
    result.show()
    val setQuery = "use instance_name"
    sqlContext.sql(setQuery)
    val queryCreateTable = "CREATE EXTERNAL TABLE IF NOT EXISTS EMPLOYEES_Spark(\n   EMPLOYEE_ID INT,\n   FIRST_NAME STRING,\n   LAST_NAME STRING,\n   EMAIL STRING,\n   PHONE_NUMBER STRING,\n   HIRE_DATE DATE,\n   JOB_ID STRING,\n   SALARY DECIMAL(8,2),\n   COMMISSION_PCT DECIMAL(2,2),\n   MANAGER_ID INT\n   )\n   PARTITIONED BY (DEPARTMENT_ID INT)\n   LOCATION 'path'\n   tblproperties (\"`skip.header.line.count`\"=\"1\")"
    sqlContext.sql(queryCreateTable)
    sqlContext.setConf("hive.exec.dynamic.partition","true")
    sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
    val insertQuery = "insert overwrite table employees_spark partition (department_id) select * from temp_emp"
    sqlContext.sql(insertQuery)
}
}

Я хочу знать, что с этим не так, когда я запускаю как файл jar в Oozie.

Я выполняю весь искровой запрос в spark shell и получаю правильный результат до финальногошаг.Я могу просмотреть вводимые данные во внешнюю таблицу улья.

1 Ответ

0 голосов
/ 21 июня 2019

вам нужно использовать HiveContext вместо SQLContext.

...