Я пытаюсь создать приложение Play- Scala, которое использует Scala Kafka Consumer для прослушивания брокера Kafka. Я использую библиотеку Cake Solutions Scala Kafka Client и следую их примеру здесь .
Я создал содержащий класс, который будет выступать в качестве поставщика Kafka для потребителей. , и я связал это как активный одноэлемент, чтобы он создавался при запуске приложения.
Проблема в том, что потребитель будет слушать посредника при запуске приложения, но не после этого.
Вот мой код для ConsumerProvider:
trait KafkaConsumerProvider {
def consumer: ActorRef
}
@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
extends KafkaConsumerProvider {
private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
groupId = configuration.get[String]("messageBroker.consumer.groupId"),
enableAutoCommit = false,
autoCommitInterval= 1000,
sessionTimeoutMs = 10000,
maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
maxPollRecords = 500,
maxPollInterval = 300000,
maxMetaDataAge = 300000,
autoOffsetReset = OffsetResetStrategy.LATEST,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
)
private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
scheduleInterval = 1.seconds, // scheduling interval for Kafka polling when consumer is inactive
unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
maxRedeliveries = 3 // maximum number of times a unconfirmed message will be redelivered
)
override val consumer: ActorRef = {
val receiverActor = actorSystem.actorOf(ReceiverActor.props)
val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
_consumer ! Subscribe.AutoPartition(topics)
_consumer
}
}
, и вот как я связываю зависимость как активный синглтон в Module.scala
:
class Module extends AbstractModule with ScalaModule {
override def configure(): Unit = {
bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
}
}
Как заставить потребителя продолжать слушать?