Я пытаюсь работать со встроенным кодом Apache Spark с интеграцией Kafka для онлайн-кода k-средних с именем «streaming_k_means_example.py». Вместо того, чтобы читать данные из файла, я пытаюсь читать данные из Кафки. Сначала я использовал метод
records = KafkaUtils.createStream(ssc, 'localhost.com:2181', 'spark-streaming', {'topic1':1}).map(parse)
testingData = records.map( lambda elem: list(elem))
Однако, когда я запускаю программу, она выдает ошибку -
c = list(c) # Make it a list so we can compute its length
TypeError: 'TransformedDStream'объект не повторяется.
Поэтому я попытался работать со структурированным форматом интеграции kafka + apache Spark -
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscibe", "topic1").option("startingOffsets", "latest").load()
Но здесь, когда я запускаю его с помощью следующей команды -
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 ./examples/src/main/python/mllib/streaming_test1.py
Выдает ошибку типа:
File "/home/merlin/Experiments/spark/./examples/src/main/python/mllib/streaming_test1.py", line 44, in <module>
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscibe", "topic1").option("startingOffsets", "latest").load()
File "/home/merlin/Experiments/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 403, in load
File "/home/merlin/Experiments/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/home/merlin/Experiments/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/home/merlin/Experiments/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o41.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
Используемая версия spark - 2.3.0, Scala - 2.11.12, Pyspark - 2.4.4, Кафка версия 2.11-1.1.0, Python - 2.7
Любая помощь для успешного запуска с kafka приветствуется