Ошибка KafkaUtils.createDirectStream с ошибкой TypeError - PullRequest
0 голосов
/ 07 ноября 2019

Я хочу попробовать потоковую обработку с использованием pyspark и kafka, но createDirectStream завершается с ошибкой типа

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /root/soft/spark-2.4.0-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar pyspark-shell'
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
sc = pyspark.SparkContext.getOrCreate()
ssc = StreamingContext(sc,1)
broker ="localhost:9092"
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["new_topic"], {"metadata.broker.list": broker})
directKafkaStream.pprint()
ssc.start()

        7 ssc = StreamingContext(sc,1)
        8 broker ="localhost:9092"
        ----> 9 directKafkaStream = KafkaUtils.createDirectStream(ssc, ["new_topic"], {"metadata.broker.list": broker})
 10 directKafkaStream.pprint()
 11 ssc.start()

    /apps/sh/tool/anaconda3/lib/python3.7/site-packages/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
136             return messageHandler(m)
137 
    --> 138         helper = KafkaUtils._get_helper(ssc._sc)
139 
140         jfromOffsets = dict([(k._jTopicAndPartition(helper),

    /apps/sh/tool/anaconda3/lib/python3.7/site-packages/pyspark/streaming/kafka.py in _get_helper(sc)
215     def _get_helper(sc):
216         try:
    --> 217             return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
218         except TypeError as e:
219             if str(e) == "'JavaPackage' object is not callable":

TypeError: объект 'JavaPackage' не вызывается

...