Я создал DummySource, который считывает строки из файла и преобразует их в объекты TaxiRide
. Проблема в том, что есть поля, соответствующие org.joda.time.DateTime
, где я использую org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
, и SparkStreaming не может сериализовать эти поля.
Как заставить SparkStreaming сериализовать их? Мой код ниже вместе с ошибкой.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.sense.spark.util.TaxiRideSource
object TaxiRideCountCombineByKey {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("TaxiRideCountCombineByKey")
.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val stream = ssc.receiverStream(new TaxiRideSource())
stream.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.zip.GZIPInputStream
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
case class TaxiRide(rideId: Long, isStart: Boolean, startTime: DateTime, endTime: DateTime,
startLon: Float, startLat: Float, endLon: Float, endLat: Float,
passengerCnt: Short, taxiId: Long, driverId: Long)
class TaxiRideSource extends Receiver[TaxiRide](StorageLevel.MEMORY_AND_DISK_2) {
val timeFormatter: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC()
val dataFilePath = "/home/flink/nycTaxiRides.gz";
val delayInNanoSeconds: Long = 1000000
def onStart() {new Thread("TaxiRide Source") {override def run() {receive()}}.start()}
def onStop() {}
private def receive() {
while (!isStopped()) {
val gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath))
val reader: BufferedReader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8))
var line: String = ""
while (reader.ready() && (line = reader.readLine()) != null) {
val startTime = System.nanoTime
// read the line on the file and yield the object
val taxiRide: TaxiRide = getTaxiRideFromString(line)
store(Iterator(taxiRide))
Thread.sleep(1000)
}
}
}
def getTaxiRideFromString(line: String): TaxiRide = {
// println(line)
val tokens: Array[String] = line.split(",")
if (tokens.length != 11) {
throw new RuntimeException("Invalid record: " + line)
}
val rideId: Long = tokens(0).toLong
val (isStart, startTime, endTime) = tokens(1) match {
case "START" => (true, DateTime.parse(tokens(2), timeFormatter), DateTime.parse(tokens(3), timeFormatter))
case "END" => (false, DateTime.parse(tokens(2), timeFormatter), DateTime.parse(tokens(3), timeFormatter))
case _ => throw new RuntimeException("Invalid record: " + line)
}
val startLon: Float = if (tokens(4).length > 0) tokens(4).toFloat else 0.0f
val startLat: Float = if (tokens(5).length > 0) tokens(5).toFloat else 0.0f
val endLon: Float = if (tokens(6).length > 0) tokens(6).toFloat else 0.0f
val endLat: Float = if (tokens(7).length > 0) tokens(7).toFloat else 0.0f
val passengerCnt: Short = tokens(8).toShort
val taxiId: Long = tokens(9).toLong
val driverId: Long = tokens(10).toLong
TaxiRide(rideId, isStart, startTime, endTime, startLon, startLat, endLon, endLat, passengerCnt, taxiId, driverId)
}
}
Ошибка такая:
20/06/17 11:27:26 ERROR TaskSetManager: Failed to serialize task 59, not attempting to retry it.
java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
Serialization stack:
- object not serializable (class: org.joda.time.format.DateTimeFormatter, value: org.joda.time.format.DateTimeFormatter@19bf2eee)
- field (class: org.sense.spark.util.TaxiRideSource, name: timeFormatter, type: class org.joda.time.format.DateTimeFormatter)
- object (class org.sense.spark.util.TaxiRideSource, org.sense.spark.util.TaxiRideSource@2fef7647)
- element of array (index: 0)
- array (class [Lorg.apache.spark.streaming.receiver.Receiver;, size 1)
- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(org.sense.spark.util.TaxiRideSource@2fef7647))
- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@107f)
- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
- object (class org.apache.spark.scheduler.ResultTask, ResultTask(59, 0))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
at scala.Option.map(Option.scala:146)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/06/17 11:27:26 INFO TaskSchedulerImpl: Removed TaskSet 59.0, whose tasks have all completed, from pool