Спарк груп по выпуску - PullRequest
0 голосов

Попытка написать простую программу с spqrk. Я должен сгруппировать свои данные по одному атрибуту myStruct - LogData:

public class LogData {

    public String           m_Host;
    public String           m_Timestamp;
    public String           m_Request;
    public Integer          m_Reply;
    public String           m_ColumnByteReply;
}

Что я пробовал:

JavaPairRDD <String, Iterable<LogData>> tmp = parsedData.groupBy(logData -> logData.m_Host);

JavaPairRDD<String, Iterable<LogData>> groupMap = parsedData.groupBy(new Function<LogData, String>() {
            @Override
            public String call(LogData logData) throws Exception {
                return logData.m_Host;
            }
        });

и просто:

JavaPairRDD <String, Iterable<LogData>> tmp = parsedData.groupBy(logData -> logData.m_Host);

И когда я пытаюсь вывести resultData, моя программа не работает.

Ошибки:

18/05/13 20:51:32 ОШИБКА Исполнитель: Исключение в задании 0.0 на этапе 1.0 (TID 1) java.io.NotSerializableException: LogData в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) в java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) в java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) в java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) в java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) в org.apache.spark.storage.DiskBlockObjectWriter.write (BlockObjectWriter.scala: 195) в org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles (ExternalSorter.scala: 370) в org.apache.spark.util.collection.ExternalSorter.insertAll (ExternalSorter.scala: 211) в org.apache.spark.shuffle.sort.SortShuffleWriter.write (SortShuffleWriter.scala: 65) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 68) в org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 41) в org.apache.spark.scheduler.Task.run (Task.scala: 56) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 196) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) на java.lang.Thread.run (Thread.java:748) 18.05.13 20:51:32 ОШИБКА TaskSetManager: Задача 0.0 на этапе 1.0 (TID 1) привела к не сериализуемому результату: LogData; не повторять 18.05.13 20:51:32 INFO TaskSchedulerImpl: Удален TaskSet 1.0, все задачи которого выполнены, из пула 18/05/13 20:51:32 INFO TaskSchedulerImpl: отмена этапа 1 18/05/13 20:51:32 ИНФОРМАЦИЯ DAGScheduler: сбой задания 1: считать в WordCount.java:92, заняло 0,088518 с org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0.0 на этапе 1.0 (TID 1) привела к не сериализуемому результату: LogData в org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1214) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1203) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1202) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1202) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 696) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 696) в scala.Option.foreach (Option.scala: 236) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 696) в org.apache.spark.scheduler.DAGSchedulerEventProcessActor $$ anonfun $ receive $ 2.applyOrElse (DAGScheduler.scala: 1420) at akka.actor.Actor $ class.aroundReceive (Actor.scala: 465) в org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive (DAGScheduler.scala: 1375) в akka.actor.ActorCell.receiveMessage (ActorCell.scala: 516) в akka.actor.ActorCell.invoke (ActorCell.scala: 487)в akka.dispatch.Mailbox.processMailbox (Mailbox.scala: 238) в akka.dispatch.Mailbox.run (Mailbox.scala: 220) в akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec (AbstractDispatcher.scala: 393) в scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) в scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339) на scala.concurrent.forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) в scala.concurrent.forkjoin.ForkJoinWorkerThread.run (ForkJoinWorkerThread.java:107)

Помогите :) Спасибо за ваши ответы!

1 Ответ

0 голосов
/ 13 мая 2018

Изменить

public class LogData {
    public String           m_Host;
    public String           m_Timestamp;
    public String           m_Request;
    public Integer          m_Reply;
    public String           m_ColumnByteReply;
}

на

public class LogData implements Serializable  {
    public String           m_Host;
    public String           m_Timestamp;
    public String           m_Request;
    public Integer          m_Reply;
    public String           m_ColumnByteReply;
}
...