Я Scala разработчик и в настоящее время изучаю Python, на самом деле PySpark. Чтобы проверить подключение от Spark к Hive в Scala, нам просто нужен метод enableHiveSupport (), пока я создаю свой объект sparkSession. Версия Spark: 2.3.1, Python: 2.7.5
Например, объект Spark в одной из моих программ Scala -Spark выглядит так:
val spark = SparkSession.builder().config(conf).master("yarn").config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate()
После этого Я сразу использовал объект spark для запроса таблиц кустов, просто указав их в spark. sql или в любых API-интерфейсах с данными, которые я использовал для чтения таблиц кустов.
Теперь я проверяю Python подключение к Spark с использованием того же кода выше. У меня установлен HDP 3.0 на локальной машине и я тестирую подключение Spark с Python, как показано ниже.
from pyspark import SparkConf
from pyspark.sql import SparkSession
def get_interface_log_ids(spark):
idf = spark.read.format("jdbc").option("url","jdbc:postgresql://localhost:5432/mydb").option("user", "myuser").option("password", "mypass").option("driver", "o
rg.postgresql.Driver").option("dbtable","(select name, emplocation, empid, salary, credit, rownum from emptable) as controlTableDF").load()
print("Connection Established")
count = idf.count()
print("Count: {}".format(count))
idf.show()
idf.write.insertInto('emptable')
if __name__ == "__main__":
sparkConf = SparkConf().setAppName("DummyAP").set("spark.jars","/root/jars/postgresql-42.1.1.jar,/root/jars/postgresql-9.4.1212.jar")
spark = SparkSession.builder.config(conf=sparkConf).config("spark.sql.warehouse.dir", '/apps/hive/warehouse').enableHiveSupport().getOrCreate()
get_interface_log_ids(spark)
Я запускаю приведенный выше скрипт с командой spark-submit:
spark-submit --master=yarn --deploy-mode=client --driver-class-path /root/jars/postgresql-9.4.1212.jar:/root/jars/pos
tgresql-42.1.1.jar --conf spark.jars=/root/jars/postgresql-9.4.1212.jar,/root/jars/postgresql-42.1.1.jar --num-executors 1 --executor-cores 1 --executor-memory 1g --dr
iver-memory 1g --driver-cores 2 --name "COUNT" /root/pytest/check.py
Данные извлекаются, я вижу счетчик, способный печатать извлеченные данные из таблицы postgres, указанной в запросе, но вставка блока данных idf.write.insertInto ("empdb.emptable") в таблицу кустов завершается неудачно как говорится в сообщении об ошибке:
Traceback (most recent call last):
File "/root/pytest/check.py", line 15, in <module>
get_interface_log_ids(spark)
File "/root/pytest/check.py", line 11, in get_interface_log_ids
idf.write.insertInto('emptable')
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 716, in insertInto
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u'Table or view not found: empdb.emptable;'
У меня есть точная таблица, созданная в базе данных empdb, как показано ниже:
Я пытался передать hive-site. xml из --files в spark-submit, как показано ниже
SPARK_MAJOR_VERSION=2 spark-submit --master=yarn --deploy-mode=client --driver-class-path /root/jars/postgresql-9.4.1212.jar:/root/jars/pos
tgresql-42.1.1.jar --conf spark.jars=/root/jars/postgresql-9.4.1212.jar,/root/jars/postgresql-42.1.1.jar --files /etc/hive/conf/hive-site.xml --num-executors 1 --execu
tor-cores 1 --executor-memory 1g --driver-memory 1g --driver-cores 2 --name "COUNT" /root/pytest/check.py
Но все равно вижу ту же ошибку. Нужно ли создавать HiveCon