искра не JSON для JSON и ошибки кадра данных - PullRequest
0 голосов
/ 31 августа 2018

У меня есть файл типа json (не настоящая структура json), но я преобразовал его в json и прочитал спарк-чтение json (у нас в spark 1.6.0), я пока не могу использовать многострочную функцию из spark 2. Он отображает результаты, но в то же время выдает ошибку. Любая помощь с благодарностью.

У меня есть такой документ ... взят только один пример, но это массив:

$result = [
            {
              'name' => 'R-2018:1583',
              'issue_date' => '2018-05-17 02:51:06',
              'type' => 'Product Enhancement Advisory', 
              'last_modified_date' => '2018-05-17 03:51:00',
              'id' => 273,
              'update_date' => '2018-05-17 02:51:06',
              'synopsis' => ' enhancement  update',
              'advory' => 'R:1583'
            }
                ]

Я использовал так:

jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json").map(lambda x: x[1]).map(lambda x:x.replace('$result =','')).map(lambda x: x.replace("'",'"')).map(lambda x:x.replace("\n","")).map(lambda x:x.replace("=>",":")).map(lambda x:x.replace("  ",""))
sqlContext.read.json(rdd).show() 

Он отображает результаты, но я также получаю сообщение об ошибке ниже, пожалуйста, помогите в этом.

18/08/31 11:19:30 WARN util.ExecutionListenerManager: Ошибка выполнения прослушивателя выполнения запроса java.lang.ArrayIndexOutOfBoundsException: 0 в org.apache.spark.sql.query.analysis.QueryAnalysis $$ anonfun $ getInputMetadata $ 2.apply (QueryAnalysis.scala: 121) в org.apache.spark.sql.query.analysis.QueryAnalysis $$ anonfun $ getInputMetadata $ 2.apply (QueryAnalysis.scala: 108) в scala.collection.LinearSeqOptimized $ class.foldLeft (LinearSeqOptimized.scala: 111) в scala.collection.immutable.List.foldLeft (List.scala: 84) в org.apache.spark.sql.query.analysis.QueryAnalysis $ .getInputMetadata (QueryAnalysis.scala: 108) в com.cloudera.spark.lineage.ClouderaNavigatorListener.writeQueryMetadata (ClouderaNavigatorListener.scala: 74) в com.cloudera.spark.lineage.ClouderaNavigatorListener.onSuccess (ClouderaNavigatorListener.scala: 54) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (QueryExecutionListener.scala: 100) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (QueryExecutionListener.scala: 99) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling $ 1.apply (QueryExecutionListener.scala: 121) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling $ 1.apply (QueryExecutionListener.scala: 119) в scala.collection.immutable.List.foreach (List.scala: 318) в scala.collection.generic.TraversableForwarder $ class.foreach (TraversableForwarder.scala: 32) в scala.collection.mutable.ListBuffer.foreach (ListBuffer.scala: 45) в org.apache.spark.sql.util.ExecutionListenerManager.org $ apache $ spark $ sql $ util $ ExecutionListenerManager $$ withErrorHandling (QueryExecutionListener.scala: 119) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1.apply $ mcV $ sp (QueryExecutionListener.scala: 99) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1.apply (QueryExecutionListener.scala: 99) в org.apache.spark.sql.util.ExecutionListenerManager $$ anonfun $ onSuccess $ 1.apply (QueryExecutionListener.scala: 99) в org.apache.spark.sql.util.ExecutionListenerManager.readLock (QueryExecutionListener.scala: 132) в org.apache.spark.sql.util.ExecutionListenerManager.onSuccess (QueryExecutionListener.scala: 98) в org.apache.spark.sql.DataFrame.withCallback (DataFrame.scala: 2116) в org.apache.spark.sql.DataFrame.head (DataFrame.scala: 1389) в org.apache.spark.sql.DataFrame.take (DataFrame.scala: 1471) в org.apache.spark.sql.DataFrame.showString (DataFrame.scala: 184) at sun.reflect.GeneratedMethodAccessor55.invoke (неизвестный источник) at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:606) на py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:231) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:381) at py4j.Gateway.invoke (Gateway.java:259) на py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:133) на py4j.commands.CallCommand.execute (CallCommand.java:79)at py4j.GatewayConnection.run (GatewayConnection.java:209) at java.lang.Thread.run (Thread.java:745)

1 Ответ

0 голосов
/ 01 сентября 2018
Функция

json принимает путь к файлу json в качестве параметра, поэтому сначала необходимо сохранить где-нибудь json, а затем прочитать этот файл.

Нечто подобное должно работать

jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json")
            .map(lambda x: x[1])
            .map(lambda x:x.replace('$result =',''))
            .map(lambda x: x.replace("'",'"'))
            .map(lambda x:x.replace("\n",""))
            .map(lambda x:x.replace("=>",":"))
            .map(lambda x:x.replace("  ",""))
            .saveAsTextFile("/user/xxxx/aa_transformed.json") 
sqlContext.read.json(jsonRDD).show() 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...