Попытка написать простую программу с spqrk. Я должен сгруппировать свои данные по одному атрибуту myStruct - LogData:
public class LogData {
public String m_Host;
public String m_Timestamp;
public String m_Request;
public Integer m_Reply;
public String m_ColumnByteReply;
}
Что я пробовал:
JavaPairRDD <String, Iterable<LogData>> tmp = parsedData.groupBy(logData -> logData.m_Host);
JavaPairRDD<String, Iterable<LogData>> groupMap = parsedData.groupBy(new Function<LogData, String>() {
@Override
public String call(LogData logData) throws Exception {
return logData.m_Host;
}
});
и просто:
JavaPairRDD <String, Iterable<LogData>> tmp = parsedData.groupBy(logData -> logData.m_Host);
И когда я пытаюсь вывести resultData, моя программа не работает.
Ошибки:
18/05/13 20:51:32 ОШИБКА Исполнитель: Исключение в задании 0.0 на этапе 1.0
(TID 1)
java.io.NotSerializableException: LogData
в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184)
в java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548)
в java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509)
в java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432)
в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)
в java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348)
в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42)
в org.apache.spark.storage.DiskBlockObjectWriter.write (BlockObjectWriter.scala: 195)
в org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles (ExternalSorter.scala: 370)
в org.apache.spark.util.collection.ExternalSorter.insertAll (ExternalSorter.scala: 211)
в org.apache.spark.shuffle.sort.SortShuffleWriter.write (SortShuffleWriter.scala: 65)
в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 68)
в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 41)
в org.apache.spark.scheduler.Task.run (Task.scala: 56)
в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 196)
в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624)
на java.lang.Thread.run (Thread.java:748)
18.05.13 20:51:32 ОШИБКА TaskSetManager: Задача 0.0 на этапе 1.0 (TID 1) привела к не сериализуемому результату: LogData; не повторять
18.05.13 20:51:32 INFO TaskSchedulerImpl: Удален TaskSet 1.0, все задачи которого выполнены, из пула
18/05/13 20:51:32 INFO TaskSchedulerImpl: отмена этапа 1
18/05/13 20:51:32 ИНФОРМАЦИЯ DAGScheduler: сбой задания 1: считать в WordCount.java:92, заняло 0,088518 с
org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0.0 на этапе 1.0 (TID 1) привела к не сериализуемому результату: LogData
в org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1214)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1203)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1202)
в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59)
в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47)
в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1202)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 696)
в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 696)
в scala.Option.foreach (Option.scala: 236)
в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 696)
в org.apache.spark.scheduler.DAGSchedulerEventProcessActor $$ anonfun $ receive $ 2.applyOrElse (DAGScheduler.scala: 1420)
at akka.actor.Actor $ class.aroundReceive (Actor.scala: 465)
в org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive (DAGScheduler.scala: 1375)
в akka.actor.ActorCell.receiveMessage (ActorCell.scala: 516)
в akka.actor.ActorCell.invoke (ActorCell.scala: 487)в akka.dispatch.Mailbox.processMailbox (Mailbox.scala: 238)
в akka.dispatch.Mailbox.run (Mailbox.scala: 220)
в akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec (AbstractDispatcher.scala: 393)
в scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
в scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339)
на scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979)
в scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)
Помогите :)
Спасибо за ваши ответы!