Я не могу получать сообщения в msgItr, где, как и в командной строке, используя команды kafka, я могу видеть сообщения в разделе.пожалуйста, дайте мне знать, что здесь происходит.что я должен сделать, чтобы получить сообщения.
Я пытался напечатать, но ничего не печатает.Может быть потому, что это СДР и что-то печатает на узле исполнителя.
val ssc = new StreamingContext(conf, Seconds(props.getProperty("spark.streaming.batchDuration").toInt))
val topics = Set(props.getProperty("kafkaConf.topic"))
// TODO: Externalize StorageLevel to props file
val storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
//"zookeeper.connect" -> "fepp-cdhmn-d2.fepoc.com"
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"zookeeper.connect" -> props.getProperty("kafkaConf.zookeeper.connect"),
"bootstrap.servers" -> props.getProperty("kafkaConf.bootstrap.servers"),
"group.id" -> props.getProperty("kafkaConf.group.id"),
"zookeeper.connection.timeout.ms" -> props.getProperty("kafkaConf.zookeeper.connection.timeout.ms"),
"security.protocol" -> props.getProperty("kafkaConf.security.protocol"),
"ssl.protocol" -> props.getProperty("kafkaConf.ssl.protocol"),
"ssl.keymanager.algorithm" -> props.getProperty("kafkaConf.ssl.keymanager.algorithm"),
"ssl.enabled.protocols" -> props.getProperty("kafkaConf.ssl.enabled.protocols"),
"ssl.truststore.type" -> props.getProperty("kafkaConf.ssl.truststore.type"),
"ssl.keystore.type" -> props.getProperty("kafkaConf.ssl.keystore.type"),
"ssl.truststore.location" -> props.getProperty("kafkaConf.ssl.truststore.location"),
"ssl.truststore.password" -> props.getProperty("kafkaConf.ssl.truststore.password"),
"ssl.keystore.location" -> props.getProperty("kafkaConf.ssl.keystore.location"),
"ssl.keystore.password" -> props.getProperty("kafkaConf.ssl.keystore.password"),
"ssl.key.password" -> props.getProperty("kafkaConf.ssl.key.password"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> props.getProperty("kafkaConf.auto.offset.reset"),
"enable.auto.commit" -> (props.getProperty("kafkaConf.enable.auto.commit").toBoolean: java.lang.Boolean),
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"
//"heartbeat.interval.ms" -> props.getProperty("kafkaConf.heartbeat.interval.ms"),
//"session.timeout.ms" -> props.getProperty("kafkaConf.session.timeout.ms")
)
// Must use the direct api as the old api does not support SSL
log.debug("Creating direct kafka stream")
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val res = kafkaStream.foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
val numPartitions = kafkaRdd.getNumPartitions
log.info(s"Processing RDD with '$numPartitions' partitions.")
// Only one partition for the kafka topic is supported at this time
if (numPartitions != 1) {
throw new RuntimeException("Kafka topic must have 1 partition")
}
val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRdd.foreachPartition((msgItr: Iterator[ConsumerRecord[String, String]]) => {
val log = LogManager.getRootLogger()
msgItr.foreach((kafkaMsg: ConsumerRecord[String, String]) => {
// Hbase connection Fails here. because of authentication with below error
2018-09-19 15:28:01 INFO ZooKeeper:100 - Client environment:user.home=/home/service_account
2018-09-19 15:28:01 INFO ZooKeeper:100 - Client environment:user.dir=/data/09/yarn/nm/usercache/service_account/appcache/application_1536891989660_9297/container_e208_1536891989660_9297_01_000002
2018-09-19 15:28:01 INFO ZooKeeper:438 - Initiating client connection, connectString=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181 sessionTimeout=90000 watcher=hconnection-0x16648f570x0, quorum=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181, baseZNode=/hbase
2018-09-19 15:28:01 INFO ClientCnxn:975 - Opening socket connection to server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-19 15:28:01 INFO ClientCnxn:852 - Socket connection established, initiating session, client: /999.99.999.999:33314, server: depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181
2018-09-19 15:28:01 INFO ClientCnxn:1235 - Session establishment complete on server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181, sessionid = 0x365cb965ff33958, negotiated timeout = 60000
false
false
2018-09-19 15:28:02 WARN UserGroupInformation:1923 - PriviledgedActionException as:service_account (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 WARN RpcClientImpl:675 - Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 ERROR RpcClientImpl:685 - SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:181)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:618)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:163)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:744)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:741)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:741)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:907)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:874)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1243)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$BlockingStub.isMasterRunning(MasterProtos.java:58383)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.isMasterRunning(ConnectionManager.java:1712)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStubNoRetries(ConnectionManager.java:1650)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStub(ConnectionManager.java:1672)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.makeStub(ConnectionManager.java:1701)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveMasterService(ConnectionManager.java:1858)
at org.apache.hadoop.hbase.client.MasterCallable.prepare(MasterCallable.java:38)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:134)
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4313)
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4305)
at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:533)
at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:517)
at com.company.etl.HbaseConnect.mainMethod(HbaseConnect.scala:39)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:205)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:178)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:178)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:161)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
... 43 more