Как я могу проверить, что смещение было совершено или нет для Кафки - PullRequest
0 голосов
/ 19 декабря 2018

У меня есть источник Akka Stream Kafka, который читает тему Kafka.

У меня есть простая задача, которая позволяет отключить фиксацию смещения сообщения.Фиксация обычно выполняется с помощью commitScaladsl.

Моя проблема в том, что я не знаю, как проверить, было ли смещение зафиксировано или нет.

Обычно мы используем EmbeddedKafka для тестирования, но у меня нетне нашел способ узнать последнее зафиксированное смещение.

Это пример написанного мною теста:

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

Теперь я хочу добавить проверку на смещениебыть совершенным или нет.

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

Я наконец решил это, используя ConsumerInterceptor, определенный как:

class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
  override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records

  override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
    import scala.collection.JavaConverters._
    OffsetRecorder.add(offsets.asScala)
  }

  override def close(): Unit = {}

  override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear

}

onCommit вызывается, когда фиксация завершена, в этом случае я просто записываю это.Я использую метод настройки, чтобы в начале каждого теста были пустые записи.

Затем, при создании пользовательских настроек для источника, я добавляю перехватчик как свойство:

  ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(s"localhost:${kafkaPort}")
    .withGroupId("group-id")
    .withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")
0 голосов
/ 19 декабря 2018

Кафка сохраняет смещения по TopicName и PartitionID.Таким образом, вы можете использовать метод .committed() или .position, чтобы проверить последнее зафиксированное смещение или текущую позицию потребителя Kafka.

commit () : Получить последнее зафиксированное смещение для данного раздела(произошла ли фиксация этим или другим процессом).

position () : Получить смещение следующей записи, которая будет извлечена (если существует запись с таким смещением).

...