Чтобы ограничить размер пакета при использовании потоковой передачи Spark, я сослался на этот ответ
В Кафке хранится около 50 миллионов записей (которые должны быть использованы). Топи c имеет 3 раздела.
zhihu_comment 0 10906153 28668062 17761909 - - -
zhihu_comment 1 10972464 30271728 19299264 - - -
zhihu_comment 2 10906395 28662007 17755612 - - -
Мое потребительское приложение:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("max.poll.records", "500");
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}
Я ограничил потребляющий размер потоковой передачи искры, в моем случае я установил maxRatePerPartition
до 10000, что означает, что в моем случае он потреблял 300000 записей на одну партию.
Вопрос в том, что хотя потоковая передача с плавающей запятой может обрабатывать записи с указанным c пределом, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset
:
zhihu_comment 0 28700537 28700676 139 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 1 30305102 30305224 122 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 2 28695033 28695146 113 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
Похоже, что потоковая передача Spark не фиксирует смещение в каждом пакете, она фиксирует самое последнее смещение в начале, когда оно начинает потреблять!
Есть ли способ сделать фиксацию потоковой искры с каждым пакетом?
Журнал потоковой передачи Spark, подтверждающий количество записей, которое он потреблял в каждом пакете:
20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms