Как читать файлы sql в pyspark? - PullRequest
1 голос
/ 03 июля 2019

Я пытался запустить этот код, ожидая, что он создаст таблицу из файла sql, который содержит схему таблиц и значения с использованием pyspark.не мог понять ошибку.Пожалуйста, помогите мне.

-------------------- ФАЙЛ SQL ------------------------

#CREATE TABLE IF NOT EXISTS `user_details` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `first_name` varchar(50) DEFAULT NULL,
  `last_name` varchar(50) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `password` varchar(50) DEFAULT NULL,
  `status` tinyint(10) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=MyISAM  DEFAULT CHARSET=latin1 AUTO_INCREMENT=10001 ;

-

- Сброс данных для таблицы user_details

INSERT INTO `user_details` (`user_id`, `username`, `first_name`, `last_name`, `gender`, `password`, `status`) VALUES
(1, 'rogers63', 'david', 'john', 'Fe ;male', 'e6a33eee180b07e563d74fee8c2c66b8', 1),
(2, 'mike28', 'rogers', 'paul', 'Male', '2e7dc6b8a1598f4f75c3eaa47958ee2f', 1),
(3, 'rivera92', 'david', 'john', 'Male', '1c3a8e03f448d211904161a6f5849b68', 1),
(4, 'ross95', 'maria', 'sanders', 'Male', '62f0a68a4179c5cdd997189760cbcf18', 1),
(5, 'paul85', 'morris', 'miller', 'Female', '61bd060b07bddfecccea56a82b850ecf', 1),
(6, 'smith34', 'daniel', 'michael', 'Female', '7055b3d9f5cb2829c26cd7e0e601cde5', 1),
(7, 'james84', 'sanders', 'paul', 'Female', 'b7f72d6eb92b45458020748c8d1a3573', 1),
(8, 'daniel53', 'mark', 'mike', 'Male', '299cbf7171ad1b2967408ed200b4e26c', 1),
(9, 'brooks80', 'morgan', 'maria', 'Female', 'aa736a35dc15934d67c0a999dccff8f6', 1),
(10, 'morgan65', 'paul', 'miller', 'Female', '#a28dca31f5aa5792e1cefd1dfd098569', 1);

Заранее спасибо

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("asdasd").set("spark.driver.memory", "1g")
sc = SparkContext(conf = conf)

sqlContext = SQLContext(sc) 

with open("/home/data/Downloads/Sample-SQL-File-10-Rows.sql") as fr:
    query = fr.read()
results = sqlContext.sql(query)

Это то, что я пытался сделать

, но я получил ошибку

Py4JJavaError                             Traceback (most recent call last)
~/vishal/lib/python3.5/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/vishal/lib/python3.5/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.sql.AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);;
'CreateTable `employee`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Ignore

    at org.apache.spark.sql.execution.datasources.HiveOnlyCheck$$anonfun$apply$12.apply(rules.scala:392)
    at org.apache.spark.sql.execution.datasources.HiveOnlyCheck$$anonfun$apply$12.apply(rules.scala:390)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117)
    at org.apache.spark.sql.execution.datasources.HiveOnlyCheck$.apply(rules.scala:390)
    at org.apache.spark.sql.execution.datasources.HiveOnlyCheck$.apply(rules.scala:388)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:386)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:386)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:386)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-23-16ed5164dfb1> in <module>
      1 with open("/home/data/Downloads/Sample-SQL-File-10-Rows.sql") as fr:
      2     query = fr.read()
----> 3 results = sqlContext.sql(query)

~/vishal/lib/python3.5/site-packages/pyspark/sql/context.py in sql(self, sqlQuery)
    356         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    357         """
--> 358         return self.sparkSession.sql(sqlQuery)
    359 
    360     @since(1.0)

~/vishal/lib/python3.5/site-packages/pyspark/sql/session.py in sql(self, sqlQuery)
    765         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    766         """
--> 767         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    768 
    769     @since(2.0)

~/vishal/lib/python3.5/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/vishal/lib/python3.5/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "Hive support is required to CREATE Hive TABLE (AS SELECT);;\n'CreateTable `employee`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Ignore\n"
...