Kafkaconsumer небезопасен для многопоточного доступа 4 ответа Kafka - это источник данных для потоковой передачи искры, а приемник передается заново.Тем не менее, «java.lang.IllegalArgumentException: требование не выполнено: не удается записать в закрытый ByteBufferOutputStream» выходит, как ее решить?
def initKafkaParams(bootstrap_servers: String, groupId: String, duration: String = "5000") : Map[String,Object] = {
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> bootstrap_servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean),
"auto.commit.interval.ms" -> duration
)
kafkaParams
}
val kafkaParams = KafkaUtil.initKafkaParams(Configuration.bootstrap_servers_log, groupId, duration)
val topics = Array(topic)
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val cachedStream = stream.cache()
val closed_uids = cachedStream.map(record => parseJson(record.value)).filter(record => record != null)
closed_uids.foreachRDD(rdd =>
rdd.foreachPartition(rows =>
{
val rc = RedisClient("recall")
try {
val redis = rc.getRedisClient()
val pipe = redis.pipelined()
val redisKey = "zyf_radio"
rows.foreach(r => {
//val redis = rc.getRedisClient()
pipe.sadd(redisKey, r)
})
pipe.sync()
redis.close()
} catch {
case e: Exception => println("redis error!")
}
}
)
)
java.lang.IllegalArgumentException: требование не выполнено: невозможно записать в закрытоеByteBufferOutputStream в scala.Predef $ .require (Predef.scala: 224) в org.apache.spark.util.ByteBufferOutputStream.write (ByteBufferOutputStream.scala: 40) в блоке java.io.ObjectOutputTutStutOutDoutOutDataTawToutDataTawTyDataTawStreamD77: $..apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 43) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100) в org.apache.spark.rpc.netty.NettyRpcEEnv.(NettyRpcEnv.scala: 253) в org.apache.spark.rpc.netty.NettyRpcEnv.send (NettyRpcEnv.scala: 192) в org.apache.spark.rpc.netty.NettyRpcEndpointRef.send (NettyRpcEnv.scala: 512) в org.apache.sparkecegstatusUpdate (CoarseGrainedExecutorBackend.scala: 142) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 412) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolEx11) atj).concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:748) 74 631 1 (спекулятивный) SUCCESS RACK_LOCAL 5 / c1-dsj-hadoop040.bj stdout 201 stdout stder/ 05/26 10:37:40 38 мс 27,0 КБ / 20