Как сделать потоковую передачу искры в каждой партии при ограничении размера партии Kafka? - PullRequest
0 голосов
/ 04 мая 2020

Чтобы ограничить размер пакета при использовании потоковой передачи Spark, я сослался на этот ответ

В Кафке хранится около 50 миллионов записей (которые должны быть использованы). Топи c имеет 3 раздела.

zhihu_comment   0          10906153        28668062        17761909        -               -               -
zhihu_comment   1          10972464        30271728        19299264        -               -               -
zhihu_comment   2          10906395        28662007        17755612        -               -               -

Мое потребительское приложение:

public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";

    // Create context with a certain seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("max.poll.records", "500");


    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    lines.count().print();

    jssc.start();
    jssc.awaitTermination();
  }
}

Я ограничил потребляющий размер потоковой передачи искры, в моем случае я установил maxRatePerPartition до 10000, что означает, что в моем случае он потреблял 300000 записей на одну партию.

Вопрос в том, что хотя потоковая передача с плавающей запятой может обрабатывать записи с указанным c пределом, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset:

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

Похоже, что потоковая передача Spark не фиксирует смещение в каждом пакете, она фиксирует самое последнее смещение в начале, когда оно начинает потреблять!

Есть ли способ сделать фиксацию потоковой искры с каждым пакетом?

Журнал потоковой передачи Spark, подтверждающий количество записей, которое он потреблял в каждом пакете:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000

20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms

1 Ответ

1 голос
/ 04 мая 2020

Вам нужно отключить

kafkaParams.put("enable.auto.commit", false);

и использовать

messages.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // do here some transformations and action on the rdd, typically like:
  // rdd.foreachPartition(it -> {
  //   it.foreach(row -> ...)
  // })

  // some time later, after outputs have completed
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

, как описано в Spark + Kafka Руководство по интеграции .

Вы также можете использовать commitSync для синхронных фиксаций.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...