У меня есть процесс Scala / Spark / Kafka, который я запускаю.Когда я впервые запускаю процесс, я создаю объект KuduClient, используя созданную мной функцию, которую я разделяю между классами.Для этой работы я создаю KuduClient только один раз и позволяю процессу выполняться непрерывно.Я заметил, что через несколько дней я часто получаю исключения.
Я не совсем уверен, что делать.Я думаю, что возможно было бы создать новый клиент Kudu каждый день или около того, но я не уверен, как это сделать и в этом случае.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.json.JSONObject
import org.apache.kudu.client.KuduClient
import org.apache.log4j.Logger
object Thing extends Serializable {
@transient lazy val client: KuduClient = createKuduClient(config)
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
def main(args: Array[String]) {
UtilFunctions.loadConfig(args) //I send back a config object.
UtilFunctions.loadLogger() //factory method to load logger
val props: Map[String, String] = setKafkaProperties()
val topic = Set(config.getString("config.TOPIC_NAME"))
val conf = new SparkConf().setMaster("local[2]").setAppName(config.getString("config.SPARK_APP_NAME"))
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint(config.getString("config.SPARK_CHECKPOINT_NAME"))
// val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topic)
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic, props))
val distRecordsStream = kafkaStream.map(record => (record.key(), record.value()))
distRecordsStream.window(Seconds(10), Seconds(10))
distRecordsStream.foreachRDD(distRecords => {
logger.info(distRecords + " : " + distRecords.count())
distRecords.foreach(record => {
logger.info(record._2)
MyClass.DoSomethingWithThisData(new JSONObject(record._2), client)
})
})
ssc.start()
ssc.awaitTermination()
}
def createKuduClient(config: Config): KuduClient = {
var client: KuduClient = null
try{
client = new KuduClient.KuduClientBuilder(config.getString("config.KUDU_MASTER"))
.defaultAdminOperationTimeoutMs(config.getInt("config.KUDU_ADMIN_TIMEOUT_S") * 1000)
.defaultOperationTimeoutMs(config.getInt("config.KUDU_OPERATION_TIMEOUT_S") * 1000)
.build()
}
catch {
case e: Throwable =>
logger.error(e.getMessage)
logger.error(e.getStackTrace.toString)
Thread.sleep(10000) //try to create a new kudu client
client = createKuduClient(config)
}
client //return
}
def setKafkaProperties(): Map[String, String] = {
val zookeeper = config.getString("config.ZOOKEEPER")
val offsetReset = config.getString("config.OFFSET_RESET")
val brokers = config.getString("config.BROKERS")
val groupID = config.getString("config.GROUP_ID")
val deserializer = config.getString("config.DESERIALIZER")
val autoCommit = config.getString("config.AUTO_COMMIT")
val maxPollRecords = config.getString("config.MAX_POLL_RECORDS")
val maxPollIntervalms = config.getString("config.MAX_POLL_INTERVAL_MS")
val props = Map(
"bootstrap.servers" -> brokers,
"zookeeper.connect" -> zookeeper,
"group.id" -> groupID,
"key.deserializer" -> deserializer,
"value.deserializer" -> deserializer,
"enable.auto.commit" -> autoCommit,
"auto.offset.reset" -> offsetReset,
"max.poll.records" -> maxPollRecords,
"max.poll.interval.ms" -> maxPollIntervalms)
props
}
}
Исключения ниже.Я удалил IP-адрес вместо использования "x"
ОШИБКА client.TabletClient: [Peer master-ip-xxx-xx-xxx-40.ec2.internal: 7051] Неожиданное исключение извниз по течению [id: 0x42ba3f4d, /xxx.xx.xxx.39:36820 => ip-xxx-xxx-xxx-40.ec2.internal / xxx.xx.xxx.40: 7051] java.lang.RuntimeException: можетне десериализовать ответ, несовместимый RPC?Ошибка: шаг в org.apache.kudu.client.KuduRpc.readProtobuf (KuduRpc.java:383) в org.apache.kudu.client.Negotiator.parseSaslMsgResponse (Negotiator.java:282) в org.apache.kudu.li.Ngotiator.handleResponse (Negotiator.java:235) в org.apache.kudu.client.Negotiator.messageReceived (Negotiator.java:229) в org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleCandnelUstream.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived (ReadTimeoutHandler.java:184)в org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChanteH.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream (OneToOneDecoder.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipe))org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.nelnel.Channel.fireMessageReceived (Channels.java:296) по адресу org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived (FrameDecoder.java:462) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode (FrameDecoder.java:443) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.: 70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline $ DefaultChannelHandlerContext.sendUpstream (DefaultChannelPipeline.java:791) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:296)в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived (FrameDecoder.java:462) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode (FrameDecoder.java:443) в org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived (FrameDecoder.java:303 в)org.apache.kudu.client.shaded.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream (SimpleChannelUpstreamHandler.java:70) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefstreamCanePhanipe(DefaultChannelPipeline.java:564) в org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream (DefaultChannelPipeline.java:559) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:268) в org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.fireMessageReceived (Channels.java:255) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.read (NioWorker.java:88) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run (AbstractNioSelector.java:337) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run (AbstractNioWorker.java:89) в org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioWorker.run (NioWorker.java:178) в org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run (ThreadRenamingRunnable.java:108) в org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker $ 1.run (DeadLockProofWorker.java:42) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPerol: 624) на java.lang.Thread.run (Thread.java:748)
У меня естьКроме того, после некоторого времени выполнения некоторых исключений, подобных этим,
java.io.IOException: Все датододы DatanodeInfoWithStorage[xxx.xx.xxx.36: 1004, DS-55c403c3-203a-4dac-b383-72fcdb686185, DISK] плохие.Прерывание ... at org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.setupPipelineForAppendOrRecovery (DFSOutputStream.java:1465) в org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.processDatanodeava.ap_set_set_setup_set_бытьевозаключении.hadoop.hdfs.DFSOutputStream $ DataStreamer.run (DFSOutputSt
Это как-то связано с слишком большим количеством открытых файлов? Способ «очистить» эти файлы, когда они достигнут предела?