Я пытаюсь создать Kafka Consumer, который использует MongoDB-Spark-Connector в той же программе. Что-то вроде ввода Kafka как RDD -> в Dataframe, а затем сохранить его в MongoDB для дальнейшего использования.
Мой продюсер запущен, и «стандартный» потребитель выглядит так и получает сообщения красиво:
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 30)
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming-consumer', {'trump':1})
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.pprint()
ssc.start()
ssc.awaitTermination()
«Модифицированный» потребитель, которого я хочу использовать, который создается через SparkSessionBuilder с параметрами конфигурации для использования mongodb, выглядит следующим образом:
#Additional for Session Building and Preprocessing
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
import collections
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("TrumpTweets") \
.config("spark.executor.memory", "1gb") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/trumptweets.tweets") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/trumptweets.tweets") \
.getOrCreate()
ssc = StreamingContext(spark.sparkContext, 30)
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming-consumer', {'trump':1})
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.pprint()
ssc.start()
ssc.awaitTermination()
работает нормально, но не получать любые сообщения ... Я не вижу ничего другого, кроме SessionBuilder, который не выдает никаких сообщений об ошибках или около того.
Пожалуйста, помогите мне, я действительно застрял на этом .. Любой другой способ также приветствуется!