Как инициируются и запускаются потребители со структурированной потоковой передачей при чтении разделов кафки с несколькими разделами? - PullRequest
0 голосов
/ 10 июня 2019

Если в теме kakfa имеется более одного раздела, в java эти экземпляры / потоки потребителя будут созданы на стороне потребителя.

Как это может быть обработано на стороне потребителя с потоковой передачей искры?Я не нахожу много информации относительно того же самого.Любой образец для того же, т. Е. Призыв к нескольким потребителям в спринг-потоке-потребителе темы.

Любое предложение / примеры дизайна будут очень благодарны.

С уважением, Шьям

Ответы [ 2 ]

2 голосов
/ 10 июня 2019

Spark Streaming всегда считывает данные параллельно со всех доступных разделов в Kafka, если у spark достаточно ресурсов. это идет из коробки с Spark, и нам не нужно писать никакой код для этого.

например, если ваша тема Kafka имеет 4 раздела, то если вы запустите Ваша искровая работа с двумя исполнителями по 2 ядра, а затем ваша искровая работа запустит 4 задания для чтения данных параллельно с 4 кафки Перегородки.

Не стесняйтесь комментировать, если вам нужна дополнительная информация.

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

import java.sql.Timestamp

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import java.time.{LocalDate, LocalDateTime}
import java.util.Calendar



object SparkKafka {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("test_app")
      .getOrCreate()
    val sparkContext = spark.sparkContext


    val ssc = new StreamingContext(sparkContext, Seconds(1)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
///val currentHour = now.get(Calendar.HOUR_OF_DAY)

    log.info("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("Kafka_topic_name"), getKafkaParams()))

      .map(record => record.value)

    stream.foreachRDD { rdd =>

      try {
        if (!rdd.isEmpty()) {
          log.info("rdd is not empty and saving to -->>"+LocalDate.now.getYear+"/"+LocalDate.now.getMonth+"/"+LocalDate.now.getDayOfMonth+"/"+LocalDateTime.now().getHour)
          rdd.saveAsTextFile("hdfs:///<folder to save>") //TODO::: Externalize the HDFS location to Props




          LocalDate.now.getMonth


         if (null != args && null != args {
            0
          } && args {
            0
          }.equals("log")) {
            rdd.foreach(x => print("Message read and saved TO S3 bucket----*****--->>" + x))
          }
        }
      } catch {

        case t: Throwable =>
          t.printStackTrace() // TODO: handle error)
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }

    ssc.start()
    log.info("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()

  }

  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "host:port
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "Group_Name",
      //      "sasl.kerberos.service.name" -> "kafka",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean))
  }

}
2 голосов
/ 10 июня 2019

Если Kafka имеет более одного раздела, это означает, что потребители могут извлечь выгоду из этого, выполнив определенную задачу параллельно. В частности, spark-streaming внутренне может ускорить работу, увеличив параметр num-executors. Это связано с количеством разделов, которые есть у Kafka, например , если у вас столько же разделов Kafka, сколько у num-executors в spark, теоретически все исполнители могут читать все разделы одновременно, что, очевидно, увеличивает пропускная способность системы.

...