Я создал один образец Direct Kafka Stream в Spark. Кафка имеет 30 разделов данного топи c. Но все потребители работают с одного и того же исполнителя.
Скриншот Kafka Manager.
Согласно моему пониманию, в прямом Kafka Stream, Driver дает смещения исполнителям и опрашивает их.
Версия Spark: 2.4
Пример кода ниже:
import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import java.util.Arrays;
import java.util.HashMap;
public class Main {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");
conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
System.out.println("Processing test-topic-1");
try {
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
rdd.foreachPartition(partition -> {
OffsetRange o = beginOffsets[TaskContext.get().partitionId()];
});
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
});
ssc.start();
ssc.awaitTermination();
}
public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}