Как разобрать json в искре с помощью пользовательской функции? - PullRequest
1 голос
/ 13 июля 2020

Я работаю над проектом, который включает в себя kafka, spark и hive. У меня есть пример такого события,

{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}

Там есть задание потребителя, которое подписывается на kafka topi c и потребляет события, а затем записывает их в hdfs (расположение моей таблицы улья)

Моя проблема в том, что я хочу написать функцию для синтаксического анализа события json в строку для каждой строки, но я получил AttributeError: 'NoneType' object has no attribute 'repartition'

Вся моя работа потребителя похожа на

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json


class OrderEventConsumer:

def __init__(self):
    conf = SparkConf().setAppName('OrderEventConsumer')
    self.sc = SparkContext().getOrCreate(conf)
    self.sc.setLogLevel('ERROR')
    self.ssc = StreamingContext(self.sc, 5)
    self.ssc.checkpoint('/tmp/SparkCheckpoints')
    sqlContext = SQLContext(self.sc)

    # Kafka variables
    self.zkQuorum = 'localhost:2189'
    self.topic = 'test'  # 'prod-raw-product-view'

def format_event(self, rdd):
    for i in range(len(rdd['lineitems'])):
        yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
                                         rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])

def consume(self):
    kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
    aRdd = kvs.map(lambda x: json.loads(x[1])) \
        .foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
        .repartition(1) \
        .saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
    self.ssc.start()
    self.ssc.awaitTermination()


if __name__ == '__main__':
    orderConsumer = OrderEventConsumer()
    orderConsumer.consume()

I хотите записать файл в hdfs, включает в себя количество элементов строки, умноженное на строку для каждого события. Как мне это сделать?

Спасибо.

1 Ответ

1 голос
/ 15 июля 2020

Вы не должны использовать функции foreachRDD и foreach - они не возвращают никаких данных. Если вы хотите отформатировать свои вещи, просто используйте map, как вы делали в предыдущей строке.

Также не используйте repartition, вместо этого используйте coalesce - это могло бы быть намного быстрее

PS Если вы только начинаете, я бы порекомендовал использовать Spark Structured Streaming - это может быть более производительно и легче подавать в суд, чем Spark Streaming.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...