Мы пытаемся объединить тему Kafka с включенными функциями SSL с версией Apache spark 1.6 pyspark и получаем следующее сообщение об ошибке. Подскажите, пожалуйста, как решить эту проблему.
Код CLI производителя:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list dvtcbddd3001.corp.cox.com:9093 --security-protocol SASL_SSL --topic inst_monitor_status_test
>this is test message1
>this is test message2
>this is test message3
Команда CLI потребителя:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --broker-list dvtcbddd3001.corp.cox.com:9093 --security-protocol SASL_SSL --topic inst_monitor_status_test --from-beginning
>this is test message1
>this is test message2
>this is test message3
Код Pyspark: (поток DS)
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import SimpleProducer, KafkaClient
def handler(message):
records = message.collect()
for record in records:
print(record)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, ['inst_monitor_status_test'], {"metadata.broker.list": 'dvtcbddd3001.corp.cox.com:9093'})
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Сообщение об ошибке:
![enter image description here](https://i.stack.imgur.com/Uvem9.png)
Основываясь на приведенной ниже ссылке,
Direct Kafka Stream с PySpark (Apache Spark 1.6)
мы понимаем, что тема
не существует илиброкеры недоступны или есть проблема сетевого (прокси) типа. но мы не знаем, как это исправить. Пожалуйста, помогите нам решить эту проблему.