Я использую Alpakka-kafka в scala, чтобы потреблять кафку топи c. Вот мой код:
val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaConfig.server)
.withGroupId(kafkaConfig.group)
.withProperties(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "100",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
.runWith(Sink.foreach(println))
Однако потребитель начинает опрос только с первого незафиксированного сообщения в topi c. Я хотел бы всегда начинать со смещения 0, независимо от того, какие сообщения принимаются. С потребителем Alpakka, как указать смещение вручную?