Kafkastream не прослушивает ввод при создании через SparkSession Builder - PullRequest
0 голосов
/ 06 февраля 2020

Я пытаюсь создать Kafka Consumer, который использует MongoDB-Spark-Connector в той же программе. Что-то вроде ввода Kafka как RDD -> в Dataframe, а затем сохранить его в MongoDB для дальнейшего использования.

Мой продюсер запущен, и «стандартный» потребитель выглядит так и получает сообщения красиво:

#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 30)

kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming-consumer', {'trump':1})

parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.pprint()

ssc.start()
ssc.awaitTermination()

«Модифицированный» потребитель, которого я хочу использовать, который создается через SparkSessionBuilder с параметрами конфигурации для использования mongodb, выглядит следующим образом:

#Additional for Session Building and Preprocessing
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
import collections

#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("TrumpTweets") \
   .config("spark.executor.memory", "1gb") \
   .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/trumptweets.tweets") \
   .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/trumptweets.tweets") \
   .getOrCreate()

ssc = StreamingContext(spark.sparkContext, 30)


kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming-consumer', {'trump':1})

parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.pprint()

ssc.start()
ssc.awaitTermination()

работает нормально, но не получать любые сообщения ... Я не вижу ничего другого, кроме SessionBuilder, который не выдает никаких сообщений об ошибках или около того.

Пожалуйста, помогите мне, я действительно застрял на этом .. Любой другой способ также приветствуется!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...