Создание RDD из объектов consumerRecord от потребителя Kafka - PullRequest
0 голосов
/ 14 сентября 2018

Я хочу создать rdd из python-kafka в моем потоковом приложении.

Мой код:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from kafka import KafkaConsumer


conf = (SparkConf()
       .setAppName("test"))

spark = SparkSession.builder \
     .appName(" ") \
     .config(conf=conf) \
     .getOrCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc, 15)

topic = 'mytopic'

consumer = KafkaConsumer('mytopic', group_id='mytopic-groupid', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')

rdd = sc.parallelize([i for i in consumer] )
print(rdd.collect())

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

Но когда я пытаюсь собрать и распечатать его, ничего не отображается.

Когда я просто вывожу как:

consumer = KafkaConsumer('mytopic', group_id='mytopic-groupid', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
for i in consumer:
    print(i)

Выводит потребительский контент:

ConsumerRecord(topic='mytopic', partition=1, offset=53797, timestamp=1536916141939, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:39 -0800] INFO     134.25.20.69 root - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)
ConsumerRecord(topic='mytopic', partition=1, offset=53798, timestamp=1536916141942, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:39 -0800] INFO     134.25.20.69 user12 - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)
ConsumerRecord(topic='mytopic', partition=1, offset=53799, timestamp=1536916141943, timestamp_type=0, key=None, value=b'[22/Feb/2018 11:57:40 -0800] INFO     134.25.20.69 jhon - "POST /notebook/api/check_status HTTP/1.1"', checksum=None, serialized_key_size=-1, serialized_value_size=104)

Я знаю, что могу создать прямой поток с kafkautils, но я хочу понять,почему это не работает.Просьба подсказать, почему rdd не может быть создан внутри открытого потока, что я делаю не так?

...