Ошибка при создании универсального KafkaConsumer - PullRequest
0 голосов
/ 04 октября 2018

Я пытаюсь создать универсальный KafkaConsumer, где люди, использующие это, могут просто передавать типы для сериализатора и десериализатора.Я использую scala-kafka-client, предоставленный https://github.com/cakesolutions/scala-kafka-client. Вот мой код

package com.loci


import com.loci.config.LociConfiguration
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe}
import cakesolutions.kafka.akka.{
  ConsumerRecords,
  Extractor,
  KafkaConsumerActor,
  Offsets
}
import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.Deserializer

import scala.concurrent.duration._

object LociKafkaConsumer extends LociConfiguration {

  def apply[K, V](config: com.typesafe.config.Config,
                  system: ActorSystem,
                  keySerializer: Deserializer[K],
                  valueSerializer: Deserializer[V],
                  lociConsumer: LociConsumer[K, V]): ActorRef = {

    val consumerConf = KafkaConsumer
      .Conf(keySerializer,
            valueSerializer,
            groupId = config.getString("group.id"),
            enableAutoCommit = false,
            autoOffsetReset = OffsetResetStrategy.EARLIEST)
      .withConf(this.conf)
    val actorConf = KafkaConsumerActor.Conf(1.seconds, 3.seconds)
    system.actorOf(
      Props(
        new LociKafkaConsumerSelfManaged[K, V](consumerConf,
                                               actorConf,
                                               config,
                                               lociConsumer)))
  }
}

class LociKafkaConsumerSelfManaged[K, V](kafkaConfig: KafkaConsumer.Conf[K, V],
                                         actorConfig: KafkaConsumerActor.Conf,
                                         config: Config,
                                         lociConsumer: LociConsumer[K, V])
    extends Actor
    with ActorLogging {

  val consumer: ActorRef = context.actorOf(
    KafkaConsumerActor.props(kafkaConfig, actorConfig, self)
  )

  val recordsExt: Extractor[Any, ConsumerRecords[K, V]] = ConsumerRecords.fromValues()

  consumer ! Subscribe.ManualOffset(
    Offsets(Map((new TopicPartition(config.getString("topic"), 0), 1))))

  override def receive: Receive = {

    // Records from Kafka
    case recordsExt(records) =>
      lociConsumer.processRecords(records)
      sender() ! Confirm(records.offsets)
  }

}

Я получаю эту ошибку

main/scala/com/loci/LociKafkaConsumer.scala:53:29: No TypeTag available for K [error] KafkaConsumerActor.props(kafkaConfig, actorConfig, self) [error] ^ [error] /media/hsingh/ExtraDrive1/projects/loci/loci-commons/src/main/scala/com/loci/LociKafkaConsumer.scala:56:84: No TypeTag available for K [error] val recordsExt: Extractor[Any, ConsumerRecords[K, V]] = ConsumerRecords.extractor[K,V]

Я использую scala 2.12

URL для git repo: Здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...