У нас есть класс utils, который будет сбрасывать смещения для данной группы потребителей.Вот текущий код (в нем есть ошибка, из-за которой он плохо обрабатывает пропущенные смещения, и это побудило меня углубиться в это):
def resetOffsets(topics: Seq[String], groupId: String)
(implicit ec: ExecutionContext): Future[Map[TopicPartition, Long]] = {
// TODO add error handling so that an empty reduceLeft is not happening
Future {
withConsumer(groupId) { consumer =>
topics.map { topic =>
val partitions = consumer.partitionsFor(topic)
val tps = partitions.asScala.map(p => new TopicPartition(topic, p.partition())).asJava
val offsets = consumer.beginningOffsets(tps).asScala
consumer.commitSync(offsets.mapValues(new OffsetAndMetadata(_)).asJava)
offsets
}.reduce(_ ++ _).mapValues(l => l: Long).toMap
}
}
}
Я пытаюсь настроить тестовый жгутчтобы утверждать, что это поведение на самом деле происходит так, как я ожидаю, но я сталкиваюсь с постоянными проблемами со сроками, группами потребителей не создаются и т. д ...
"A KafkaHelper" should "reset offsets" in {
val kafkaHelper = new KafkaHelper(helperSettings)
val groupId = GroupIdBase + s"_${new Random().nextInt(Integer.MAX_VALUE)}"
println("Group id " + groupId)
val consumer = createConsumer(groupId)
consumer.assign(List(Partition).asJava)
println(consumer.subscription())
consumer.seek(Partition, 1L)
consumer.commitSync()
println(consumer.position(Partition))
whenReady(kafkaHelper.resetOffsets(Seq(Topic), groupId)) { result =>
result shouldEqual Map(Partition -> 0L)
consumer.position(Partition) shouldEqual 0L
}
}
То, что я пытаюсь сделать, это просто прогрессгруппе потребителей (или установите для нее какое-то смещение вручную), вызовите мою функцию сброса и затем подтвердите, что группа потребителей действительно была сброшена до 0.
Некоторые проблемы, с которыми я столкнулся:
- Пока я не попаду в блок
whenReady
, моя группа потребителей даже не создается.Это вызывает проблемы с методом subscribe
, а затем пытается проверить текущий раздел. - Поэтому, чтобы обойти это, я попытался вручную назначить раздел.Я не уверен, что это хорошая идея.
Как правильно создать тестовый набор из этого кода, чтобы я мог создать автоматизированный набор тестов?Я использую локальный экземпляр докера Kafka для проверки.Сначала я начал использовать https://github.com/manub/scalatest-embedded-kafka,, так как это то, что мы используем для тестирования большей части нашего кода, обращенного к Kafka, но затем решил отказаться от него, так как не знал, реализует ли он базовое поведение так же, как обычный кластер Kafka..