Как правильно проверить операции смещения группы потребителей Kafka? - PullRequest
0 голосов
/ 07 мая 2019

У нас есть класс utils, который будет сбрасывать смещения для данной группы потребителей.Вот текущий код (в нем есть ошибка, из-за которой он плохо обрабатывает пропущенные смещения, и это побудило меня углубиться в это):

def resetOffsets(topics: Seq[String], groupId: String)
                  (implicit ec: ExecutionContext): Future[Map[TopicPartition, Long]] = {
    // TODO add error handling so that an empty reduceLeft is not happening
    Future {
      withConsumer(groupId) { consumer =>
        topics.map { topic =>
          val partitions = consumer.partitionsFor(topic)
          val tps = partitions.asScala.map(p => new TopicPartition(topic, p.partition())).asJava
          val offsets = consumer.beginningOffsets(tps).asScala
          consumer.commitSync(offsets.mapValues(new OffsetAndMetadata(_)).asJava)
          offsets
        }.reduce(_ ++ _).mapValues(l => l: Long).toMap
      }
    }
  }

Я пытаюсь настроить тестовый жгутчтобы утверждать, что это поведение на самом деле происходит так, как я ожидаю, но я сталкиваюсь с постоянными проблемами со сроками, группами потребителей не создаются и т. д ...

"A KafkaHelper" should "reset offsets" in {
    val kafkaHelper = new KafkaHelper(helperSettings)

    val groupId = GroupIdBase + s"_${new Random().nextInt(Integer.MAX_VALUE)}"

    println("Group id " + groupId)

    val consumer = createConsumer(groupId)

    consumer.assign(List(Partition).asJava)

    println(consumer.subscription())

    consumer.seek(Partition, 1L)

    consumer.commitSync()

    println(consumer.position(Partition))

    whenReady(kafkaHelper.resetOffsets(Seq(Topic), groupId)) { result =>
      result shouldEqual Map(Partition -> 0L)
      consumer.position(Partition) shouldEqual 0L
    }
  }

То, что я пытаюсь сделать, это просто прогрессгруппе потребителей (или установите для нее какое-то смещение вручную), вызовите мою функцию сброса и затем подтвердите, что группа потребителей действительно была сброшена до 0.

Некоторые проблемы, с которыми я столкнулся:

  • Пока я не попаду в блок whenReady, моя группа потребителей даже не создается.Это вызывает проблемы с методом subscribe, а затем пытается проверить текущий раздел.
  • Поэтому, чтобы обойти это, я попытался вручную назначить раздел.Я не уверен, что это хорошая идея.

Как правильно создать тестовый набор из этого кода, чтобы я мог создать автоматизированный набор тестов?Я использую локальный экземпляр докера Kafka для проверки.Сначала я начал использовать https://github.com/manub/scalatest-embedded-kafka,, так как это то, что мы используем для тестирования большей части нашего кода, обращенного к Kafka, но затем решил отказаться от него, так как не знал, реализует ли он базовое поведение так же, как обычный кластер Kafka..

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...