Проблема: в настоящее время я пытаюсь прочитать текстовый файл с данными Json.Целью этого является подсчет различного числа пользователей по идентификатору пользователя, присутствующему в Json.Проблема под рукой: применение метода count () к RDD в python spark-throwing error прилагается.
нулевое значение, предоставляемое каждому разделу, уникально по сравнению с указанным для окончательного вызова сокращения
Код:
Шаг 1: я прочитал в файлев RDD с именем rdd_step1, используя
sc.textFile('filepath')
Шаг 2: создайте функцию, которая в основном считывает каждую строку и возвращает объект Json, если его допустимый набор данных.
def safe_parse(raw_json):
try:
jo = json.loads(raw_json)
if jo.get('created_at'):
return jo
else:
None
except:
return None
Шаг 3:Я создаю промежуточный rdd, где я анализирую строки и получаю json-объекты, чтобы создать (ключ, значение) rdd
rdd_step2 = rdd_step2.map( lambda x: safe_parse(x)).filter(lambda x: x is not None).map(lambda x: (x['user']['id_str'],1))
Шаг 4: Использовать уменьшение по ключу и получить значения
counts= rdd_step2.reduceByKey(lambda a, b: a + b)
Count = counts.count()
print(Count)
пример данных: {"TS": "Mon jan 1 00:00:00", "id": "123213", "text": "Очевидно, вам не разрешено иметь анти", "user":{"id": 4494956854, "id_str": "4494956854", "name": "Amy Smalley", "screen_name": "amyjosmalley", "location": null, "url": null, "description": "Stay-at-home мама "," protected ": ложь," проверено ": ложно," follow_count ": 11," friends_count ": 40," selected_count ": 0," favourites_count ": 116," statuses_count ": 23,"create_at ":" Вт 15 дек. 18:57:57 +0000 2015 "}}
Полное сообщение об ошибке:
Py4JJavaError Traceback (последний последний вызов) в () 4 #raise NotImplementedError () 5 count = rdd_step2.reduceByKey (lambda a, b:a + b) ----> 6 Count = countts.count () 7 print (Count) 8 # my_output.append ("num-unique-users", users_count)
C: / Users / srikanth/Downloads/spark-2.3.1-bin-hadoop2.7/spark-2.3.1-bin-hadoop2.7\python\pyspark\rdd.py in count (self) 1071 3 1072 "" "-> 1073 return self.mapPartitions (лямбда i: [сумма (1 для _ в i)]). sum () 1074 1075 def stats (self):
C: /Users/srikanth/Downloads/spark-2.3.1-bin-hadoop2.7 / spark-2.3.1-bin-hadoop2.7 \ python \ pyspark \ rdd.py в sum (self) 1062 6.0 1063 "" "-> 1064 return self.mapPartitions (lambda x: [sum (x)]). fold (0, operator.add) 1065 1066 def count (self):
C: /Users/srikanth/Downloads/spark-2.3.1-bin-hadoop2.7/spark-2.3.1-bin-hadoop2.7 \ python \ pyspark \ rdd.py in fold (self, zeroValue, op) 933 # zeroValue, предоставляемое каждому разделу, уникально из предоставленного раздела 934 # до окончательного вызова приведения -> 935 vals = self.mapPartitions (func) .collect () 936 возвращать приращение (op, vals, zeroValue) 937
C: / Users / srikanth / Downloads / spark-2.3.1-bin-hadoop2.7 / spark-2.3.1-bin-hadoop2.7 \ python \ pyspark \ rdd.py в collect (self) 832 "" "833 с SCCallSiteSync (self.context) в виде css:-> 834 sock_info = self.ctx._jvm.PythonRDD.collectAndServe (self._jrdd.rdd ()) 835 список возврата (_load_from_socket (sock_info, self._jrdd_deserializer)) 836
~ \ Downloads \ spark-2.3.1-bin-hadoop2.7 \ spark-2.3.1-bin-hadoop2.7 \ python \ lib \ py4j-0.10.7-src.zip \ py4j \ java_gateway.py в , вызов (self, * args) 1255 answer = self.gateway_client.send_command (команда) 1256 return_value = get_return_value (-> 1257 ответ, self.gateway_client, self.target_id, self.name) 1258 1259 для temp_arg в temp_args:
~ \ Downloads \ spark-2.3.1-bin-hadoop2.7 \ spark-2.3.1-bin-hadoop2.7 \ python \ lib \ py4j-0.10.7-src.zip \ py4j \ protocol.py в get_return_value(ответ, gateway_client, target_id, name) 326 повысить Py4JJavaError (327 «Произошла ошибка при вызове {0} {1} {2}. \ n».-> 328 формат (target_id, ".", Name), значение) 329 else: 330 повысить Py4JError (
Py4JJavaError: Произошла ошибка при вызове z: org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 1 на этапе 417.0 не выполнена 1 раз, самый последний сбой: потерянная задача 1.0 на этапе 417.0 (TID 462, localhost, драйвер исполнителя): java.io.FileNotFoundException: C: \ Users \ srikanth \ AppData \ Local \ Temp \ blockmgr-f7a58081-572f-4ea9-a53e-a84f5ebfb955 \ 0c \ temp_shuffle_36ab37dd-0f30-41c6-8f4f-ef1400d58ba8 (система не может найти указанный путь)..FileOutputStream.open0 (собственный метод) в java.io.FileOutputStream.open (неизвестный источник) в java.io.FileOutputStream. (Неизвестный источник) в org.apache.spark.storage.DiskBlockObjectWriter.initialize (DiskBlockObjectWriter.scala:в org.apache.spark.storage.DiskBlockObjectWriter.open (DiskBlockObjectWriter.scala: 116) в org.apache.spark.storage.DiskBlockObjectWriter.write (DiskBlockObjectWriter.scala: 237) в org.apache.sparkgeerh.write (BypassMergeSortShuffleWriter.java:151) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 96) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run (Task.scala: 109) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 345) в java.util.concurrent.ThreadPoolExecutor.runWorker (неизвестный источник) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (неизвестный источник) в java.lang.Thread.run (Неизвестный источник)
Отслеживание стека драйверов: в org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1602) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1590) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1589) в scala.collection.m.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1589 в)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 831) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala.scala: scala: sc.foreach (Option.scala: 257) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 831) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOn.oop.doOn.Reache (DAGSchegachesc.scareduchescher)..spark.(EventLoop.scala: 48) в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 642) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2034) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2055) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2074) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2099) в org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply (RDD.scala: 939) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) в орг.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112) в org.apache.spark.rdd.RDD.withScope (RDD.scala: 363) в org.apache.spark.rdd.RDD.collect (RDD.scala: 938) в org.apache.spark.api.python.PythonRDD $ .collectAndServe (PythonRDD.scala: 162) в org.apache.spark.api.python.PythonRDD.collectAndServe (PythonRDD.scala) в sun.ref.GeneratedMethodAccessor72.invoke (Неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke (Неизвестный источник) вjava.lang.reflect.Method.invoke (Неизвестный источник) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.keateGateway.java:282) на py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) на py4j.commands.CallCommand.execute (CallCommand.java:79) на py4j.GatewayConnection.run (GatewayConnection.java:238)java.lang.Thread.run (неизвестный источник). Вызывается: java.io.FileNotFoundException: C: \ Users \ srikanth \ AppData \ Local \ Temp \ blockmgr-f7a58081-572f-4ea9-a53e-a84f5ebfb955 \ 0c \ temp36shddle-41c6-8f4f-ef1400d58ba8 (система не может найти указанный путь) в java.io.FileOutputStream.open0 (собственный метод) в java.io.FileOutputStream.open (неизвестный источник) в java.io.FileOutputStream. (Неизвестный источник)в org.apache.spark.storage.DiskBlockObjectWriter.initialize (DiskBlockObjectWriter.scala: 103) в org.apache.spark.storage.DiskBlockObjectWriter.open (DiskBlockObjectWriter.scala: 116) вorg.runTask (ShuffleMapTask.scala: 96) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run (Task.scgap: 109 at).spark.executor.Executor $ TaskRunner.run (Executor.scala: 345) в java.util.concurrent.ThreadPoolExecutor.runWorker (неизвестный источник) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (неизвестный источник) ...Еще 1