Я хочу создать rdd из python-kafka в моем потоковом приложении.
Мой код:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from kafka import KafkaConsumer
conf = (SparkConf()
.setAppName("test"))
spark = SparkSession.builder \
.appName(" ") \
.config(conf=conf) \
.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 15)
topic = 'mytopic'
consumer = KafkaConsumer('mytopic', group_id='mytopic-groupid', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
rdd = sc.parallelize([i for i in consumer] )
print(rdd.collect())
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
Но когда я пытаюсь собрать и распечатать его, ничего не отображается.
Когда я просто вывожу как:
consumer = KafkaConsumer('mytopic', group_id='mytopic-groupid', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
for i in consumer:
print(i)
Выводит потребительский контент:
ConsumerRecord(topic='mytopic', partition=1, offset=53797, timestamp=1536916141939, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:39 -0800] INFO 134.25.20.69 root - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)
ConsumerRecord(topic='mytopic', partition=1, offset=53798, timestamp=1536916141942, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:39 -0800] INFO 134.25.20.69 user12 - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)
ConsumerRecord(topic='mytopic', partition=1, offset=53799, timestamp=1536916141943, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:40 -0800] INFO 134.25.20.69 jhon - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)
Я знаю, что могу создать прямой поток с kafkautils, но я хочу понять,почему это не работает.Просьба подсказать, почему rdd не может быть создан внутри открытого потока, что я делаю не так?