Как получить приложение Scala Kafka Consumer в Play для непрерывного прослушивания брокера в течение всего срока его службы? - PullRequest
0 голосов
/ 31 марта 2020

Я пытаюсь создать приложение Play- Scala, которое использует Scala Kafka Consumer для прослушивания брокера Kafka. Я использую библиотеку Cake Solutions Scala Kafka Client и следую их примеру здесь .

Я создал содержащий класс, который будет выступать в качестве поставщика Kafka для потребителей. , и я связал это как активный одноэлемент, чтобы он создавался при запуске приложения.

Проблема в том, что потребитель будет слушать посредника при запуске приложения, но не после этого.

Вот мой код для ConsumerProvider:

trait KafkaConsumerProvider {

  def consumer: ActorRef

}

@Singleton
class KafkaConsumerProviderImpl @Inject() (actorSystem: ActorSystem, configuration: Configuration)
    extends KafkaConsumerProvider {

  private val consumerConf: KafkaConsumer.Conf[String, String] = KafkaConsumer.Conf(
    keyDeserializer = new StringDeserializer,
    valueDeserializer = new StringDeserializer,
    bootstrapServers = configuration.get[String]("messageBroker.bootstrapServers"),
    groupId = configuration.get[String]("messageBroker.consumer.groupId"),
    enableAutoCommit = false,
    autoCommitInterval= 1000,
    sessionTimeoutMs = 10000,
    maxPartitionFetchBytes = ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES,
    maxPollRecords = 500,
    maxPollInterval = 300000,
    maxMetaDataAge  = 300000,
    autoOffsetReset = OffsetResetStrategy.LATEST,
    isolationLevel = IsolationLevel.READ_UNCOMMITTED,
  )

  private val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf(
    scheduleInterval = 1.seconds,   // scheduling interval for Kafka polling when consumer is inactive
    unconfirmedTimeout = 3.seconds, // duration for how long to wait for a confirmation before redelivery
    maxRedeliveries = 3             // maximum number of times a unconfirmed message will be redelivered
  )

  override val consumer: ActorRef = {
    val receiverActor = actorSystem.actorOf(ReceiverActor.props)
    val topics = configuration.get[String]("messageBroker.consumer.topics").split(",").toSeq
    val _consumer = actorSystem.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, receiverActor))
    _consumer ! Subscribe.AutoPartition(topics)
    _consumer
  }

}

, и вот как я связываю зависимость как активный синглтон в Module.scala:

class Module extends AbstractModule with ScalaModule {

  override def configure(): Unit = {
    bind[KafkaMessageBrokerWriter].to[KafkaMessageBrokerWriterImpl].asEagerSingleton()
    bind[KafkaConsumerProvider].to[KafkaConsumerProviderImpl].asEagerSingleton()
  }

}

Как заставить потребителя продолжать слушать?

1 Ответ

0 голосов
/ 01 апреля 2020

Проблема была в том, что в ReceiverActor я забыл подтвердить смещения:

sender() ! Confirm(records.offsets)
...