Альпакка кафка потребительская офсет - PullRequest
1 голос
/ 03 апреля 2020

Я использую Alpakka-kafka в scala, чтобы потреблять кафку топи c. Вот мой код:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

Однако потребитель начинает опрос только с первого незафиксированного сообщения в topi c. Я хотел бы всегда начинать со смещения 0, независимо от того, какие сообщения принимаются. С потребителем Alpakka, как указать смещение вручную?

1 Ответ

1 голос
/ 17 апреля 2020

Я думаю, что вы хотите добавить пару записей конфигурации:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False, чтобы ваша работа никогда не сохраняла никакого смещения

  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" поэтому ваша работа начинается с самого начала.

Если ваша работа уже зафиксировала смещения в прошлом, вам, возможно, придется сбросить ее смещение до самого раннего.

...