Я запускаю задание структурированной потоковой передачи в приемник MongoDB, и я сталкиваюсь с ошибкой;
org.bson.json.JsonParseException: программа чтения JSON ожидала токен типа 'LEFT_PAREN', но обнаружила ','
Я создал MongoDBForeachWriter.scala, Helpers.scala, StructuredStreamingProgram.scala, я скомпилировал его в банку и запустил команду spark-submit
Driver stacktrace:
19/05/09 00:47:04 INFO scheduler.DAGScheduler: Job 1 failed: start at StructuredStreamingProgramscala:45, took 4.406636 s
19/05/09 00:47:04 ERROR streaming.MicroBatchExecution: Query [id = 5e0a84f1-8b03-4495-a864-cfa325ce8e3, runId = 6396a613-0a69-4e7b-8d47-f0a64422339e] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 imes, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ausplcdhdata12.us.dell.com, executr 1): org.bson.json.JsonParseException: JSON reader expected token type 'LEFT_PAREN' but found ''.
at org.bson.json.JsonReader.verifyToken(JsonReader.java:507)
at org.bson.json.JsonReader.visitDateTimeConstructorWithOutNew(JsonReader.java:928)
at org.bson.json.JsonReader.readBsonType(JsonReader.java:201)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at org.mongodb.scala.bson.BsonDocument$.apply(BsonValue.scala:191)
at org.mongodb.scala.bson.collection.mutable.Document$.apply(Document.scala:55)
at example.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:43)
at example.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:14)
package example
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.LongAccumulator
import example.Helpers._
import java.util.Calendar
object StructuredStreamingProgram {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "abcdef:9092")
.option("subscribe", "testing_streaming")
.load()
val dfs = df.selectExpr("CAST(value AS STRING)")
// sends to MongoDB once every 20 seconds
val mongodb_uri = "mongodb://buser:pswd@acdedfg.com"
val mdb_name = "HANZO_MDB"
val mdb_collection = "testing_streaming"
val CountAccum: LongAccumulator = spark.sparkContext.longAccumulator("mongostreamcount")
val structuredStreamForeachWriter: MongoDBForeachWriter = new MongoDBForeachWriter(mongodb_uri,mdb_name,mdb_collection,CountAccum)
val query = dfs.writeStream
.foreach(structuredStreamForeachWriter)
.trigger(Trigger.ProcessingTime("20 seconds"))
.start()
while (!spark.streams.awaitAnyTermination(60000)) {
println(Calendar.getInstance().getTime()+" :: EventsCount = "+CountAccum.value)
}
}
}
package example
import java.util.Calendar
import org.apache.spark.util.LongAccumulator
import org.apache.spark.sql.Row
import org.apache.spark.sql.ForeachWriter
import org.mongodb.scala._
import org.mongodb.scala.bson.collection.mutable.Document
import org.mongodb.scala.bson._
import example.Helpers._
import scala.util.Try
class MongoDBForeachWriter(p_uri: String, p_dbName: String, p_collectionName: String, p_messageCountAccum: LongAccumulator) extends ForeachWriter[Row] {
val mongodbURI = p_uri
val dbName = p_dbName
val collectionName = p_collectionName
val messageCountAccum = p_messageCountAccum
var mongoClient: MongoClient = null
var db: MongoDatabase = null
var collection: MongoCollection[Document] = null
def ensureMongoDBConnection(): Unit = {
if (mongoClient == null) {
mongoClient = MongoClient(mongodbURI)
db = mongoClient.getDatabase(dbName)
collection = db.getCollection(collectionName)
}
}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row): Unit = {
val valueStr = record.getAs[String]("value")
val doc = Document(valueStr)
doc += ("log_time" -> Calendar.getInstance().getTime())
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc).results()
// tracks how many records I have processed
if (messageCountAccum != null)
messageCountAccum.add(1)
}
override def close(errorOrNull: Throwable): Unit = {
if(mongoClient != null) {
Try {
mongoClient.close()
}
}
}
}
package example
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.mongodb.scala._
object Helpers {
implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
override val converter: Document => String = doc => doc.toJson
}
implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
override val converter: C => String = doc => doc.toString
}
trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: C => String
def results(): Seq[C] = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
def headResult(): C = Await.result(observable.head(), Duration(10, TimeUnit.SECONDS))
def printResults(initial: String = ""): Unit = {
if (initial.length > 0) print(initial)
results().foreach(res => println(converter(res)))
}
def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
}
}