Куду Клиент отказывает с исключениями после запуска в течение нескольких дней - PullRequest
0 голосов
/ 24 октября 2018

У меня есть процесс Scala / Spark / Kafka, который я запускаю.Когда я впервые запускаю процесс, я создаю объект KuduClient, используя созданную мной функцию, которую я разделяю между классами.Для этой работы я создаю KuduClient только один раз и позволяю процессу выполняться непрерывно.Я заметил, что через несколько дней я часто получаю исключения.

Я не совсем уверен, что делать.Я думаю, что возможно было бы создать новый клиент Kudu каждый день или около того, но я не уверен, как это сделать и в этом случае.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.json.JSONObject
import org.apache.kudu.client.KuduClient
import org.apache.log4j.Logger

object Thing extends Serializable {

  @transient lazy val client: KuduClient = createKuduClient(config)
  @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

  def main(args: Array[String]) {

    UtilFunctions.loadConfig(args) //I send back a config object.
    UtilFunctions.loadLogger() //factory method to load logger

    val props: Map[String, String] = setKafkaProperties()

    val topic = Set(config.getString("config.TOPIC_NAME"))

    val conf = new SparkConf().setMaster("local[2]").setAppName(config.getString("config.SPARK_APP_NAME"))
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint(config.getString("config.SPARK_CHECKPOINT_NAME"))

    // val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topic)
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic, props))
    val distRecordsStream = kafkaStream.map(record => (record.key(), record.value()))
    distRecordsStream.window(Seconds(10), Seconds(10))
    distRecordsStream.foreachRDD(distRecords => {
      logger.info(distRecords + " : " + distRecords.count())
      distRecords.foreach(record => {
        logger.info(record._2)
        MyClass.DoSomethingWithThisData(new JSONObject(record._2), client)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

  def createKuduClient(config: Config): KuduClient = {
    var client: KuduClient = null
    try{
      client = new KuduClient.KuduClientBuilder(config.getString("config.KUDU_MASTER"))
        .defaultAdminOperationTimeoutMs(config.getInt("config.KUDU_ADMIN_TIMEOUT_S") * 1000)
        .defaultOperationTimeoutMs(config.getInt("config.KUDU_OPERATION_TIMEOUT_S") * 1000)
        .build()
    }
    catch {
      case e: Throwable =>
        logger.error(e.getMessage)
        logger.error(e.getStackTrace.toString)
        Thread.sleep(10000) //try to create a new kudu client
        client = createKuduClient(config)
    }
    client //return
  }

  def setKafkaProperties(): Map[String, String] = {


    val zookeeper = config.getString("config.ZOOKEEPER")
    val offsetReset = config.getString("config.OFFSET_RESET")
    val brokers = config.getString("config.BROKERS")
    val groupID = config.getString("config.GROUP_ID")
    val deserializer = config.getString("config.DESERIALIZER")
    val autoCommit = config.getString("config.AUTO_COMMIT")
    val maxPollRecords = config.getString("config.MAX_POLL_RECORDS")
    val maxPollIntervalms = config.getString("config.MAX_POLL_INTERVAL_MS")

    val props = Map(
      "bootstrap.servers" -> brokers,
      "zookeeper.connect" -> zookeeper,
      "group.id" -> groupID,
      "key.deserializer" -> deserializer,
      "value.deserializer" -> deserializer,
      "enable.auto.commit" -> autoCommit,
      "auto.offset.reset" -> offsetReset,
      "max.poll.records" -> maxPollRecords,
      "max.poll.interval.ms" -> maxPollIntervalms)
    props
  }

}

Исключения ниже.Я удалил IP-адрес вместо использования "x"

ОШИБКА client.TabletClient: [Peer master-ip-xxx-xx-xxx-40.ec2.internal: 7051] Неожиданное исключение извниз по течению [id: 0x42ba3f4d, /xxx.xx.xxx.39:36820 => ip-xxx-xxx-xxx-40.ec2.internal / xxx.xx.xxx.40: 7051] java.lang.RuntimeException: можетне десериализовать ответ, несовместимый RPC?Ошибка: шаг в org.apache.kudu.client.KuduRpc.readProtobuf (KuduRpc.java:383) в org.apache.kudu.client.Negotiator.parseSaslMsgResponse (Negotiator.java:282) в org.apache.kudu.li.Ngotiator.handleResponse (Negotiator.java:235) в org.apache.kudu.client.Negotiator.messageReceived (Negotiator.java:229) в org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleCandnelUstream.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived (ReadTimeoutHandler.java:184)в org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChanteH.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream (OneToOneDecoder.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipe))org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.nelnel.Channel.fireMessageReceived (Channels.java:296) по адресу org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived (FrameDecoder.java:462) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode (FrameDecoder.java:443) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.: 70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296)в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived (FrameDecoder.java:462) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode (FrameDecoder.java:443) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived (FrameDecoder.java:303 в)org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefstreamCanePhanipe(DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:559) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:268) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:255) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read (NioWorker.java:88) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run (AbstractNioSelector.java:337) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run (AbstractNioWorker.java:89) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run (NioWorker.java:178) в org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run (ThreadRenamingRunnable.java:108) в org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker $ 1.run (DeadLockProofWorker.java:42) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPerol: 624) на java.lang.Thread.run (Thread.java:748)

У меня естьКроме того, после некоторого времени выполнения некоторых исключений, подобных этим,

java.io.IOException: Все датододы DatanodeInfoWithStorage[xxx.xx.xxx.36: 1004, DS-55c403c3-203a-4dac-b383-72fcdb686185, DISK] плохие.Прерывание ... at org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.setupPipelineForAppendOrRecovery (DFSOutputStream.java:1465) в org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.processDatanodeava.ap_set_set_setup_set_бытьевозаключении.hadoop.hdfs.DFSOutputStream $ DataStreamer.run (DFSOutputSt

Это как-то связано с слишком большим количеством открытых файлов? Способ «очистить» эти файлы, когда они достигнут предела?

...