Потоковая передача данных с использованием Spark с отдельного раздела в темах Kafka - PullRequest
0 голосов
/ 07 июня 2018

Я уже видел подобный вопрос, как нажмите здесь

Но все же я хочу знать, если потоковая передача данных из определенного раздела невозможна?Я использовал Потребительские стратегии Kafka в Spark Streaming метод подписки .

ConsumerStrategies.Subscribe [String, String] (themes, kafkaParams, смещения)

Это фрагмент кода, который я опробовал для подписки на тему и раздел,

val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets= 
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams,offsets))

Но когда я запускаю этот код, я получаю следующее исключение:

     Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)

PS: cdc-classic - это название темы с 17 разделами

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Укажите номер раздела и начальное смещение раздела для потоковой передачи данных в этой строке,

Map(new TopicPartition(topic, partition) -> 2L)

где,

  • раздел - номер раздела

  • 2L относится к начальному числу смещения раздела.

Затем мы можем передавать данные с выбранных разделов.

0 голосов
/ 07 июня 2018

Раздел Кафки - это модуль распараллеливания Spark.Так что даже если технически это было бы каким-то образом возможно, это не имеет смысла, поскольку все данные будут обрабатываться одним исполнителем.Вместо того, чтобы использовать Spark для него, вы можете просто запустить процесс как KafkaConsumer:

 String topic = "foo";
 TopicPartition partition0 = new TopicPartition(topic, 0);
 TopicPartition partition1 = new TopicPartition(topic, 1);
 consumer.assign(Arrays.asList(partition0, partition1));

(https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)

Если вы хотите получать прибыль от автоматических повторных попыток Spark, вы можете просто создатьобраз Docker с этим и запустите его, например, с Kubernetes с соответствующей конфигурацией повтора.

Что касается Spark, если вы действительно хотите его использовать, вы должны проверить, каково смещение прочитанного раздела.укажите неверное значение, и оно вернет вам сообщение о смещении «вне диапазона» (может начинаться с 0?).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...