scala- как я могу подтвердить, что конкретная тема существует на сервере Kafka (брокер)? - PullRequest
0 голосов
/ 10 декабря 2018

Я использую scala, spark и Kafka.У меня есть 2 вопроса.

1.Как подтвердить, что тема существует в брокере (сервере) Kafka?

2.Как подтвердить, работает ли сервер Kafka (сервер начальной загрузки)?

object kafkaProducer extends App {

  def sendMessages(): Unit = {


//define topic
val topic = "spark-topic"       // how can i confirm this topic is exist in kafka server ? 

//define producer properties
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")

//create producer instance
val kafkaProducer = new KafkaProducer[String, JsonNode](props)

//create object mapper
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

//mapper Json object to string
      def toJson(value: Any): String = {
        mapper.writeValueAsString(value)
      }


//send producer message

    val jsonstring =
      s"""{
         | "id": "0001",
         | "name": "Peter"
         |}
      """.stripMargin

    val jsonNode: JsonNode = mapper.readTree(jsonstring)
    val rec = new ProducerRecord[String, JsonNode](topic, jsonNode)
    kafkaProducer.send(rec)
    //println(rec)

  }

}

1 Ответ

0 голосов
/ 11 декабря 2018

1) Рекомендуемый способ проверить, существует ли тема, - это использовать AdminClient API.

Вы можете использовать listTopics() или describeTopics().

2) Если у вас нет привилегированного доступа к кластеру (для проверки метрик или тестов живучести), единственный способ проверить, работает ли кластер, - это попытаться подключиться к нему или использовать его.

С AdminClient вы можете использовать, например, describeCluster(), чтобы попытаться получить состояние кластера.

...