Невозможно использовать сообщения от Kafka Consumer с помощью Alpakka - PullRequest
0 голосов
/ 26 октября 2019

Я пытаюсь использовать сообщения от kafka с помощью alpakka. Я не получаю никакой ошибки от актеров akka, что потребитель прекратил, но он не получает никаких сообщений. Ниже приведен мой код

val consumerSettings = ConsumerSettings(GatewaySettings.DataPlatformKafkaConsumer.kafkaConsumer, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
      .withProperty("security.protocol", "SASL_SSL")
      .withProperty("ssl.truststore.type", "jks")
      .withProperty("sasl.mechanism", "PLAIN")
      .withProperty("ssl.truststore.location", "deployment/local/ssl/cacerts")
      .withProperty("ssl.truststore.password", "changeit")
      .withProperty("sasl.jaas.config", config)
Consumer
      .plainSource(consumerSettings, Subscriptions.topics("file-upload-alerts"))
      .map(msg => {
       println(msg.value())
      })
      .runWith(Sink.ignore)

Я добавил отладочные операторы для akka и kafka, а ниже приведены мои отладочные операторы

2019-10-26 12:03:10,397 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:10,567 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:10,568 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Requesting messages, requestId: 2, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:13,684 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:13,929 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:13,930 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 3, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:20,084 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:23,248 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:23,248 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 4, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:47,375 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:47,549 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:47,549 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Requesting messages, requestId: 5, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:03,189 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:03,891 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:03,891 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 6, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:25,142 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:25,381 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:25,381 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 7, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:28,539 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:28,780 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:28,781 DEBUG   axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Requesting messages, requestId: 8, partitions: Set(file)

Может кто-нибудь помочь мне?

1 Ответ

1 голос
/ 26 октября 2019

Ваш map(msg => println(msg.value())) преобразует элементы Источника в Unit с. Чтобы захватить и напечатать msg.value(), попробуйте:

Consumer
  .plainSource(consumerSettings, Subscriptions.topics("file-upload-alerts"))
  .map(msg => msg.value())         // or .map(_.value)
  .runWith(Sink.foreach(println))  // or .runForeach(println)

Кроме того, рассмотрите возможность использования onComplete для проверки на успех / неудачу и завершения системы Actor:

val consumeMsgs: Future[Done] = Consumer
  ...

consumeMsgs onComplete {
  case Success(_) => println("Done"); system.terminate()
  case Failure(e) => println(e.getMessage); system.terminate()
}
...