Как объявить topi c для KafkaListener в Scala, используя значение свойства - PullRequest
0 голосов
/ 04 февраля 2020

У меня есть простой проект Kafka / Scala, который создает продюсера. Однако теперь я пытаюсь создать получателя, когда использую следующий код ...

@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]]){

  @Value("${spring.kafka.topic}") val topic : String = null

  def sendMessage(msg: String): Unit = {
    System.out.println(s"Writing the message $msg to the topic ${this.topic}")
    producer.send(topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="${this.topic}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

Я получаю следующую ошибку ...

[ERROR] ...\service\KafkaService.scala:26: error: type mismatch;
[ERROR]  found   : String("${this.topic}")
[ERROR]  required: Array[String]
[ERROR]   @KafkaListener(id="test", topics="${this.topic}")

Чего мне не хватает ?

Я также попробовал следующее ...

@Configuration
public class CommonConfiguration{
    ...
    @Value("${spring.kafka.topic}")
    public String topic;
    ...
}
@Service
class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]], config: CommonConfiguration){

  def sendMessage(msg: String): Unit = {
    val topics : Array[String] = config.getTopics();
    println(s"Writing the message $msg ${topics.mkString(" ")}")
    producer.send(config.topic, msg.getBytes());
  }

  @KafkaListener(id="test", topics="#{config.topic.split(',')}")
  def consume(record: ConsumerRecord[String, String]): Unit = {
    System.out.println(s"Consumed Strinsg Message : ${record.value()}")
  }

}

Все еще не повезло, но консольный журнал производителя получает правильное значение.

Ответы [ 2 ]

0 голосов
/ 05 февраля 2020
@KafkaListener(id="scala", topics=Array("#{'${spring.kafka.topic}'.split(',')}"))

для получения дополнительной информации см. этот вопрос или этот вопрос

0 голосов
/ 04 февраля 2020

Ошибка показывает, что вам нужен тип Array [String] для тем, в то время как вы указали тип String. Вам нужно будет преобразовать строку темы в массив.

Аннотации очень похожи на Spring, где вы можете сделать что-то вроде

@Value("#{'${kafka.topic}'.split(',')}")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...