Я работаю над проектом, который включает в себя kafka, spark и hive. У меня есть пример такого события,
{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}
Там есть задание потребителя, которое подписывается на kafka topi c и потребляет события, а затем записывает их в hdfs (расположение моей таблицы улья)
Моя проблема в том, что я хочу написать функцию для синтаксического анализа события json в строку для каждой строки, но я получил AttributeError: 'NoneType' object has no attribute 'repartition'
Вся моя работа потребителя похожа на
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json
class OrderEventConsumer:
def __init__(self):
conf = SparkConf().setAppName('OrderEventConsumer')
self.sc = SparkContext().getOrCreate(conf)
self.sc.setLogLevel('ERROR')
self.ssc = StreamingContext(self.sc, 5)
self.ssc.checkpoint('/tmp/SparkCheckpoints')
sqlContext = SQLContext(self.sc)
# Kafka variables
self.zkQuorum = 'localhost:2189'
self.topic = 'test' # 'prod-raw-product-view'
def format_event(self, rdd):
for i in range(len(rdd['lineitems'])):
yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])
def consume(self):
kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
aRdd = kvs.map(lambda x: json.loads(x[1])) \
.foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
.repartition(1) \
.saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
self.ssc.start()
self.ssc.awaitTermination()
if __name__ == '__main__':
orderConsumer = OrderEventConsumer()
orderConsumer.consume()
I хотите записать файл в hdfs, включает в себя количество элементов строки, умноженное на строку для каждого события. Как мне это сделать?
Спасибо.