Spark Streaming readStream не может читать с безопасного Kafka (EventStreams) - PullRequest
1 голос
/ 19 мая 2019

Я пытаюсь отправить данные из программы в защищенный кластер Kafka (EventStreams в IBM Cloud - Cloud Foundry Services), а затем в своем потребительском приложении (которое является потоковой передачей), я пытаюсь прочитать данныеиз того же источника кафки.

Вот Properties, который я устанавливаю внутри производителя:

def getProperties: Properties = {
    val configs = new Properties()

    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-java-console-sample-producer")
    configs.put(ProducerConfig.ACKS_CONFIG, "1")
    configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<url:port for 5 brokers>")
    configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL")
    configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN")
    configs.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some apikey here>" + "\";")
    configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2")
    configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2")
    configs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")

    configs
}

А вот код, который я использую для отправки данных в Кафкуcluster:

val producer = new KafkaProducer[String , String](getProperties)

/** Reading the file line by line */

for (line <- file.getLines) {
    /** Sending the lines to the $topic inside kafka cluster initialized inside $producer */
    val data = new ProducerRecord[String , String](topic , "key" , line)
    producer.send(data)
}

Я могу подтвердить, что при этом данные отправляются в кластер Kafka, так как я могу просматривать данные, поступающие в кластер, с использованием метрик Grafana, предоставляемых облаком IBM.

Теперь, в моем коде потокового воспроизведения, вот как я пытаюсь читать из источника kafka:

val df = spark.readStream
        .format("kafka")
        .option("subscribe", "raw_weather")
        .option("kafka.bootstrap.servers", "<url:port for the same 5 brokers>")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism" , "PLAIN")
        .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<that same password given above>" + "\";")
        .option("kafka.ssl.protocol", "TLSv1.2")
        .option("kafka.ssl.enabled.protocols", "TLSv1.2")
        .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
        .load()
        .selectExpr("CAST(value as STRING)")
        .as[String]

, за которым следует:

val query= df.writeStream
    .outputMode(OutputMode.Append())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count() + " " + id)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

query.awaitTermination()

Я не уверен какпочему, но мой Spark Streaming вообще не может читать данные из источника.Когда я запускаю программу Stream Streaming, она показывает это в выходных данных:

19/05/19 04:22:28 DEBUG SparkEnv: Using serializer: class org.apache.spark.serializer.JavaSerializer
19/05/19 04:22:28 INFO SparkEnv: Registering MapOutputTracker
19/05/19 04:22:28 INFO SparkEnv: Registering BlockManagerMaster
19/05/19 04:22:28 INFO SparkEnv: Registering OutputCommitCoordinator
0 0

И однажды, когда я снова запускаю моего производителя, Stream Stream по-прежнему остается там на 0 0.Я не уверен в том, что я написал здесь неправильно.

РЕДАКТИРОВАТЬ: сохранить потребитель работает более 7 часов, по-прежнему без изменений

...