KS-средство потоковой передачи pySpark с ошибкой интеграции Kafka - PullRequest
0 голосов
/ 10 октября 2019

Я пытаюсь работать со встроенным кодом Apache Spark с интеграцией Kafka для онлайн-кода k-средних с именем «streaming_k_means_example.py». Вместо того, чтобы читать данные из файла, я пытаюсь читать данные из Кафки. Сначала я использовал метод

records = KafkaUtils.createStream(ssc, 'localhost.com:2181', 'spark-streaming', {'topic1':1}).map(parse)
    testingData = records.map( lambda elem: list(elem))

Однако, когда я запускаю программу, она выдает ошибку -

c = list(c) # Make it a list so we can compute its length

TypeError: 'TransformedDStream'объект не повторяется.

Поэтому я попытался работать со структурированным форматом интеграции kafka + apache Spark -

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscibe", "topic1").option("startingOffsets", "latest").load()

Но здесь, когда я запускаю его с помощью следующей команды -

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 ./examples/src/main/python/mllib/streaming_test1.py

Выдает ошибку типа:

  File "/home/merlin/Experiments/spark/./examples/src/main/python/mllib/streaming_test1.py", line 44, in <module>
    df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscibe", "topic1").option("startingOffsets", "latest").load()
  File "/home/merlin/Experiments/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 403, in load
  File "/home/merlin/Experiments/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/home/merlin/Experiments/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/home/merlin/Experiments/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o41.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated

Используемая версия spark - 2.3.0, Scala - 2.11.12, Pyspark - 2.4.4, Кафка версия 2.11-1.1.0, Python - 2.7

Любая помощь для успешного запуска с kafka приветствуется

...