Я пытаюсь использовать сообщения от kafka с помощью alpakka. Я не получаю никакой ошибки от актеров akka, что потребитель прекратил, но он не получает никаких сообщений. Ниже приведен мой код
val consumerSettings = ConsumerSettings(GatewaySettings.DataPlatformKafkaConsumer.kafkaConsumer, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
.withProperty("security.protocol", "SASL_SSL")
.withProperty("ssl.truststore.type", "jks")
.withProperty("sasl.mechanism", "PLAIN")
.withProperty("ssl.truststore.location", "deployment/local/ssl/cacerts")
.withProperty("ssl.truststore.password", "changeit")
.withProperty("sasl.jaas.config", config)
Consumer
.plainSource(consumerSettings, Subscriptions.topics("file-upload-alerts"))
.map(msg => {
println(msg.value())
})
.runWith(Sink.ignore)
Я добавил отладочные операторы для akka и kafka, а ниже приведены мои отладочные операторы
2019-10-26 12:03:10,397 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:10,567 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:10,568 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-21 Requesting messages, requestId: 2, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:13,684 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:13,929 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:13,930 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 3, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:20,084 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:23,248 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:23,248 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 4, partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:47,375 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:03:47,549 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:03:47,549 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-23 Requesting messages, requestId: 5, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:03,189 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:03,891 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:03,891 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 6, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:25,142 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:25,381 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:25,381 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-27 Requesting messages, requestId: 7, partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:28,539 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Revoked partitions: Set(file-upload-alerts-0). All partitions: Set()
2019-10-26 12:07:28,780 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Assigned partitions: Set(file-upload-alerts-0). All partitions: Set(file-upload-alerts-0)
2019-10-26 12:07:28,781 DEBUG axe-gateway akka.kafka.internal.SingleSourceLogic SingleSourceLogic(akka://gateway-client) gateway-client-akka.actor.default-dispatcher-26 Requesting messages, requestId: 8, partitions: Set(file)
Может кто-нибудь помочь мне?