Я хочу попробовать потоковую обработку с использованием pyspark и kafka, но createDirectStream завершается с ошибкой типа
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /root/soft/spark-2.4.0-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark-shell'
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
sc = pyspark.SparkContext.getOrCreate()
ssc = StreamingContext(sc,1)
broker ="localhost:9092"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["new_topic"], {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()
7 ssc = StreamingContext(sc,1)
8 broker ="localhost:9092"
----> 9 directKafkaStream = KafkaUtils.createDirectStream(ssc, ["new_topic"], {"metadata.broker.list": broker})
10 directKafkaStream.pprint()
11 ssc.start()
/apps/sh/tool/anaconda3/lib/python3.7/site-packages/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
136 return messageHandler(m)
137
--> 138 helper = KafkaUtils._get_helper(ssc._sc)
139
140 jfromOffsets = dict([(k._jTopicAndPartition(helper),
/apps/sh/tool/anaconda3/lib/python3.7/site-packages/pyspark/streaming/kafka.py in _get_helper(sc)
215 def _get_helper(sc):
216 try:
--> 217 return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
218 except TypeError as e:
219 if str(e) == "'JavaPackage' object is not callable":
TypeError: объект 'JavaPackage' не вызывается