Spark DStream от Kafka всегда начинается с самого начала - PullRequest
0 голосов
/ 04 июля 2018

Посмотрите на мой последний комментарий принятого ответа для решения

Я настроил DStream так:

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "kafka1.example.com:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[KafkaAvroDeserializer],
    "group.id" -> "mygroup",
    "specific.avro.reader" -> true,
    "schema.registry.url" -> "http://schema.example.com:8081"
  )

  val stream = KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, DataFile](topics, kafkaParams)
  )

Хотя это работает, и я получаю DataFile s, как и ожидалось, когда я останавливаю и перезапускаю задание, оно всегда начинается в начале темы. Как я могу добиться, чтобы оно продолжалось там, где оно в последний раз ушло?

Продолжение 1

Как и в ответе Бхима Рао Гогинени, я изменил свою конфигурацию так:

val consumerParams =
  Map("bootstrap.servers" -> bootstrapServerString,
      "schema.registry.url" -> schemaRegistryUri.toString,
      "specific.avro.reader" -> "true",
      "group.id" -> "measuring-data-files",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "enable.auto.commit" -> (false: JavaBool),
      "auto.offset.reset" -> "earliest")

И я настроил поток:

val stream = KafkaUtils.
  createDirectStream(ssc,
                     LocationStrategies.PreferConsistent,
                     ConsumerStrategies.Subscribe[String, DataFile](List(inTopic), consumerParams))

А потом я обрабатываю это:

stream.
  foreachRDD { rdd =>
    ... // Do stuff with the RDD - transform, produce to other topic etc.
    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)        
  }

Но он все равно всегда начинается с самого начала при повторном запуске.

Вот выдержка из моего журнала Кафки:

Пробег:

[2018-07-04 07:47:31,593] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 22 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,594] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,599] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 23 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:06,690] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 131488999 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:06,690] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 131488999 in 1 ms. (kafka.log.Log)
[2018-07-04 07:48:10,788] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Member consumer-1-262ece09-93c4-483e-b488-87057578dabc in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 24 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:45,761] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 153680971 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:45,763] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 153680971 in 3 ms. (kafka.log.Log)
[2018-07-04 07:49:24,819] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 175872864 (kafka.log.ProducerStateManager)
[2018-07-04 07:49:24,820] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 175872864 in 1 ms. (kafka.log.Log)

Следующий прогон:

[2018-07-04 07:50:13,748] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 24 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,749] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,754] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 25 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Member consumer-1-906c2eaa-f012-4283-96fc-c34582de33fb in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 26 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)

Продолжение 2

Я сделал сохранение смещений более подробным, как это:

    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    if(offsetRanges.isEmpty) {
      log.info("Offset ranges is empty...")
    } else {
      log.info("# offset ranges: %d" format offsetRanges.length)
    }
    object cb extends OffsetCommitCallback {

      def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata],
                     exception: Exception): Unit =
        if(exception != null) {
          log.info("Commit FAILED")
          log.error(exception.getMessage, exception)
        } else {
          log.info("Commit SUCCEEDED - count: %d" format offsets.size())
          offsets.
            asScala.
            foreach {
              case (p, omd) =>
                log.info("partition = %d; topic = %s; offset = %d; metadata = %s".
                  format(p.partition(), p.topic(), omd.offset(), omd.metadata()))
            }
        }
    }
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, cb)

И я получаю это исключение:

2018-07-04 10:14:00 ERROR DumpTask$:136 - Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:163)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:182)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Как мне решить эту проблему?

Ответы [ 2 ]

0 голосов
/ 04 июля 2018

С новым API Spark Kafka Connect мы можем попробовать асинхронные коммиты.

Считайте смещения и сделайте коммит, сделанный в процессе.

Кафка конфиг для того же:

enable.auto.commit=false

auto.offset.reset=earliest или auto.offset.reset=latest -> эта конфигурация вступает в силу, если в теме Kafka нет последнего принятого смещения, то она будет считывать смещения от начала или до конца на основе этой конфигурации.

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

   // some time later, after outputs have completed
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Вот источник: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

0 голосов
/ 04 июля 2018

Spark предоставляет два API для чтения сообщений от kafka.

Из документации Spark

Подход 1: Подход на основе приемника

Этот подход использует приемник для получения данных. Приемник реализован с использованием Кафки потребительский API высокого уровня. Как и во всех приемниках, данные, полученные от Кафка через Receiver хранится в Spark Executors, а затем в заданиях запущенный Spark Streaming обрабатывает данные.

Подход 2: Прямой Подход (Без Приемников)

Этот новый «прямой» подход без приемника был введен в Spark 1.3 обеспечить более прочные сквозные гарантии. Вместо того чтобы использовать приемники для получения данных, этот подход периодически запрашивает Kafka для последних смещений в каждой теме + раздел, и соответственно определяет диапазоны смещения для обработки в каждой партии. Когда рабочие места для обрабатывать данные запускаются, простой потребительский API Kafka используется для читать определенные диапазоны смещений из Кафки (аналогично чтению файлов из файловой системы).
Обратите внимание, что одним из недостатков этого подхода является что он не обновляет смещения в Zookeeper , следовательно, на основе Zookeeper Инструменты мониторинга Kafka не будут показывать прогресс. Тем не менее, вы можете получить доступ смещения, обработанные этим подходом в каждой партии и обновлении Зоопарк себя

В вашем случае вы используете прямой подход, поэтому вам нужно самостоятельно обработать смещение сообщения и указать диапазон смещения, с которого вы хотите прочитать сообщения. Или, если вы хотите, чтобы zookeeper позаботился о смещении сообщений, вы можете использовать подход на основе Receiver, используя KafkaUtils.createStream() API.

Подробнее о том, как обрабатывать смещение кафки, можно найти в документации spark .

...