как читать сообщения в кафке с помощью Spark Scala API - PullRequest
0 голосов
/ 19 сентября 2018

Я не могу получать сообщения в msgItr, где, как и в командной строке, используя команды kafka, я могу видеть сообщения в разделе.пожалуйста, дайте мне знать, что здесь происходит.что я должен сделать, чтобы получить сообщения.

Я пытался напечатать, но ничего не печатает.Может быть потому, что это СДР и что-то печатает на узле исполнителя.

    val ssc = new StreamingContext(conf, Seconds(props.getProperty("spark.streaming.batchDuration").toInt))

val topics = Set(props.getProperty("kafkaConf.topic"))

// TODO: Externalize StorageLevel to props file
val storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

//"zookeeper.connect" -> "fepp-cdhmn-d2.fepoc.com"
val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "zookeeper.connect" -> props.getProperty("kafkaConf.zookeeper.connect"),
  "bootstrap.servers" -> props.getProperty("kafkaConf.bootstrap.servers"),
  "group.id" -> props.getProperty("kafkaConf.group.id"),
  "zookeeper.connection.timeout.ms" -> props.getProperty("kafkaConf.zookeeper.connection.timeout.ms"),
  "security.protocol" -> props.getProperty("kafkaConf.security.protocol"),
  "ssl.protocol" -> props.getProperty("kafkaConf.ssl.protocol"),
  "ssl.keymanager.algorithm" -> props.getProperty("kafkaConf.ssl.keymanager.algorithm"),
  "ssl.enabled.protocols" -> props.getProperty("kafkaConf.ssl.enabled.protocols"),
  "ssl.truststore.type" -> props.getProperty("kafkaConf.ssl.truststore.type"),
  "ssl.keystore.type" -> props.getProperty("kafkaConf.ssl.keystore.type"),
  "ssl.truststore.location" -> props.getProperty("kafkaConf.ssl.truststore.location"),
  "ssl.truststore.password" -> props.getProperty("kafkaConf.ssl.truststore.password"),
  "ssl.keystore.location" -> props.getProperty("kafkaConf.ssl.keystore.location"),
  "ssl.keystore.password" -> props.getProperty("kafkaConf.ssl.keystore.password"),
  "ssl.key.password" -> props.getProperty("kafkaConf.ssl.key.password"),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> props.getProperty("kafkaConf.auto.offset.reset"),
  "enable.auto.commit" -> (props.getProperty("kafkaConf.enable.auto.commit").toBoolean: java.lang.Boolean),
  "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"
  //"heartbeat.interval.ms" -> props.getProperty("kafkaConf.heartbeat.interval.ms"),
  //"session.timeout.ms" -> props.getProperty("kafkaConf.session.timeout.ms")
)

// Must use the direct api as the old api does not support SSL
log.debug("Creating direct kafka stream")
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))


val res = kafkaStream.foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
          val numPartitions = kafkaRdd.getNumPartitions
          log.info(s"Processing RDD with '$numPartitions' partitions.")

      // Only one partition for the kafka topic is supported at this time
      if (numPartitions != 1) {
        throw new RuntimeException("Kafka topic must have 1 partition")
      }

      val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges

      kafkaRdd.foreachPartition((msgItr: Iterator[ConsumerRecord[String, String]]) => {

        val log = LogManager.getRootLogger()


        msgItr.foreach((kafkaMsg: ConsumerRecord[String, String]) => {

 // Hbase connection Fails here. because of authentication with below error


2018-09-19 15:28:01 INFO  ZooKeeper:100 - Client environment:user.home=/home/service_account
2018-09-19 15:28:01 INFO  ZooKeeper:100 - Client environment:user.dir=/data/09/yarn/nm/usercache/service_account/appcache/application_1536891989660_9297/container_e208_1536891989660_9297_01_000002
2018-09-19 15:28:01 INFO  ZooKeeper:438 - Initiating client connection, connectString=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181 sessionTimeout=90000 watcher=hconnection-0x16648f570x0, quorum=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181, baseZNode=/hbase
2018-09-19 15:28:01 INFO  ClientCnxn:975 - Opening socket connection to server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-19 15:28:01 INFO  ClientCnxn:852 - Socket connection established, initiating session, client: /999.99.999.999:33314, server: depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181
2018-09-19 15:28:01 INFO  ClientCnxn:1235 - Session establishment complete on server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181, sessionid = 0x365cb965ff33958, negotiated timeout = 60000
false
false
2018-09-19 15:28:02 WARN  UserGroupInformation:1923 - PriviledgedActionException as:service_account (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 WARN  RpcClientImpl:675 - Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 ERROR RpcClientImpl:685 - SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
    at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:181)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:618)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:163)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:744)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:741)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:741)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:907)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:874)
    at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1243)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
    at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
    at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$BlockingStub.isMasterRunning(MasterProtos.java:58383)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.isMasterRunning(ConnectionManager.java:1712)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStubNoRetries(ConnectionManager.java:1650)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStub(ConnectionManager.java:1672)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.makeStub(ConnectionManager.java:1701)
    at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveMasterService(ConnectionManager.java:1858)
    at org.apache.hadoop.hbase.client.MasterCallable.prepare(MasterCallable.java:38)
    at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:134)
    at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4313)
    at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4305)
    at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:533)
    at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:517)
    at com.company.etl.HbaseConnect.mainMethod(HbaseConnect.scala:39)
    at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:205)
    at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:178)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
    at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:178)
    at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:161)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
    at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
    at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
    at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
    at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    ... 43 more

Ответы [ 3 ]

0 голосов
/ 09 марта 2019

я столкнулся с точно такой же проблемой.Происходит то, что узел-исполнитель пытается записать в hbase и не имеет учетных данных.Что вам нужно сделать, это передать файл keytab исполнителям и явно вызвать аутентификацию KDC WITH В блоке executor

UserGroupInformation.loginUserFromKeytab ("hdfs-user@MYCORP.NET", "/ home / hdfs-пользователь / HDFS-user.keytab ");

0 голосов
/ 10 марта 2019

Из стека трассировки похоже, что kafka аутентифицируется с sasl.Поддерживаемые механизмы SASL:

  1. GSSAPI (Kerberos)
  2. OAUTHBEARER
  3. SCRAM
  4. PLAIN

Из вашей трассировки стека kafka настраивается с использованием GSSAPI, и вам необходимо соответственно пройти аутентификацию.Вы авторизуетесь на SSL, а не SASL.Проверьте эту ссылку для шагов для аутентификации.

0 голосов
/ 24 сентября 2018

Это из-за аутентификации kerberos.

Установка системных свойств.

  System.setProperty("java.security.auth.login.config","/your/conf/directory/kafkajaas.conf");
  System.setProperty("sun.security.jgss.debug","true");
  System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
  System.setProperty("java.security.krb5.conf", "/your/krb5/conf/directory/krb5.conf");

Вы можете читать данные из Cloudera Kafka.(Производитель)

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
  .option("subscribe", "test")
  .option("kafka.security.protocol","SASL_PLAINTEXT")
  .option("kafka.sasl.kerberos.service.name","kafka")

Вы можете записать данные в тему Cloudera Kafka (Потребитель)

val query = blacklistControl.select(to_json(struct("Column1","Column2")).alias("value"))
  .writeStream
  .format("kafka")
  .option("checkpointLocation", "/your/empty/directory")
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
  .option("kafka.security.protocol","SASL_PLAINTEXT")
  .option("kafka.sasl.kerberos.service.name","kafka")
  .option("topic", "topic_xdr")
  .start()
...