java.lang.IllegalArgumentException: требование не выполнено: невозможно записать в закрытый ByteBufferOutputStream - PullRequest
0 голосов
/ 26 мая 2019

Kafkaconsumer небезопасен для многопоточного доступа 4 ответа Kafka - это источник данных для потоковой передачи искры, а приемник передается заново.Тем не менее, «java.lang.IllegalArgumentException: требование не выполнено: не удается записать в закрытый ByteBufferOutputStream» выходит, как ее решить?

def initKafkaParams(bootstrap_servers: String, groupId: String, duration: String = "5000") : Map[String,Object] = {
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> bootstrap_servers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean), 
      "auto.commit.interval.ms" -> duration
    )
    kafkaParams
  }

val kafkaParams = KafkaUtil.initKafkaParams(Configuration.bootstrap_servers_log, groupId, duration)
    val topics = Array(topic)
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
    val cachedStream = stream.cache()
    val closed_uids = cachedStream.map(record => parseJson(record.value)).filter(record => record != null)
    closed_uids.foreachRDD(rdd =>
      rdd.foreachPartition(rows =>
      {
        val rc = RedisClient("recall")
        try {
          val redis = rc.getRedisClient()
          val pipe = redis.pipelined()
          val redisKey = "zyf_radio"
          rows.foreach(r => {
            //val redis = rc.getRedisClient()
            pipe.sadd(redisKey, r)
          })
          pipe.sync()
          redis.close()
        } catch {
          case e: Exception => println("redis error!")
        }
      }
      )
    )

java.lang.IllegalArgumentException: требование не выполнено: невозможно записать в закрытоеByteBufferOutputStream в scala.Predef $ .require (Predef.scala: 224) в org.apache.spark.util.ByteBufferOutputStream.write (ByteBufferOutputStream.scala: 40) в блоке java.io.ObjectOutputTutStutOutDoutOutDataTawToutDataTawTyDataTawStreamD77: $..apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 43) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100) в org.apache.spark.rpc.netty.NettyRpcEEnv.(NettyRpcEnv.scala: 253) в org.apache.spark.rpc.netty.NettyRpcEnv.send (NettyRpcEnv.scala: 192) в org.apache.spark.rpc.netty.NettyRpcEndpointRef.send (NettyRpcEnv.scala: 512) в org.apache.sparkecegstatusUpdate (CoarseGrainedExecutorBackend.scala: 142) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 412) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolEx11) atj).concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:748) 74 631 1 (спекулятивный) SUCCESS RACK_LOCAL 5 / c1-dsj-hadoop040.bj stdout 201 stdout stder/ 05/26 10:37:40 38 мс 27,0 КБ / 20

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...