Я уже видел подобный вопрос, как нажмите здесь
Но все же я хочу знать, если потоковая передача данных из определенного раздела невозможна?Я использовал Потребительские стратегии Kafka в Spark Streaming метод подписки .
ConsumerStrategies.Subscribe [String, String] (themes, kafkaParams, смещения)
Это фрагмент кода, который я опробовал для подписки на тему и раздел,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
Но когда я запускаю этот код, я получаю следующее исключение:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
PS: cdc-classic - это название темы с 17 разделами