Spark Structred Streaming Kafka - как читать из определенного раздела темы и делать офсетное управление - PullRequest
0 голосов
/ 29 мая 2019

Я новичок в искровом структурированном потоке и офсетном управлении кафкой.Использование spark-streaming-kafka-0-10-2.11.В потребительском, как я могу читать из определенного раздела темы?

comapany_df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", applicationProperties.getProperty(BOOTSTRAP_SERVERS_CONFIG))
                      .option("subscribe", topicName)

Я использую что-то вроде выше.как указать определенный раздел для чтения?

1 Ответ

0 голосов
/ 30 мая 2019

Вы можете использовать следующий блок кода для чтения из определенного раздела Kafka.

public void processKafka() throws InterruptedException {
    LOG.info("************ SparkStreamingKafka.processKafka start");

   // Create the spark application
    SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.executor.cores", "5");

    //To express any Spark Streaming computation, a StreamingContext object needs to be created. 
    //This object serves as the main entry point for all Spark Streaming functionality.
    //This creates the spark streaming context with a 'numSeconds' second batch size
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval));


    //List of parameters
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", this.getBrokerList());
    kafkaParams.put("client.id", "SpliceSpark");
    kafkaParams.put("group.id", "mynewgroup");
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);

    List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>();
    for(int i=0; i<5; i++) {
        topicPartitions.add(new TopicPartition("mytopic", i));
    }


    //List of kafka topics to process
    Collection<String> topics = Arrays.asList(this.getTopicList().split(","));


    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
          );

    //Another version of an attempt
    /*
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams)
      );
     */

    messages.foreachRDD(new PrintRDDDetails());


    // Start running the job to receive and transform the data 
    jssc.start();

    //Allows the current thread to wait for the termination of the context by stop() or by an exception
    jssc.awaitTermination();
}
...