У меня есть проблема с pyspark:
Я должен объединить два dstream в один dstream, но, к сожалению, я не получил отпечаток.
Это мой код:
sc = SparkContext(appName="Sparkstreaming")
spark = SparkSession(sc)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,3)
kafka_stream = KafkaUtils.createStream(ssc,"localhost:2181","consumer-be-borsa",{"Be_borsa":1})
kafka_stream1 = KafkaUtils.createStream(ssc,"localhost:2181","consumer-be-teleborsa",{"Be_teleborsa":1})
dstream = kafka_stream.map(lambda k, v: json.loads(v['id']))
dstream1 = kafka_stream1.map(lambda k, v : json.loads(v['id']))
# Join
streamJoined = dstream.join(dstream1)
streamJoined.pprint()
ssc.start()
time.sleep(100)
ssc.stop()
Это два JSON файла, которые я потребляю:
{"id": "Be_20200330", "Date": "2020-03-30", "Name": "Be", "Hour": "15.49.24", "Last Price": "0,862", "Var%": "-1,93", "Last Value": "1.020"}
{"id": "Be_20200330", "Date": "2020-03-30", "Name": "Be", "Volatility": "2,352", "Value_at_risk": "5,471"}
Результат, который я хотел бы получить:
{"id": "Be_20200330", "Date": "2020-03-30", "Name": "Be","Hour": "15.49.24", "Last Price": "0,862", "Var%": "-1,93", "Last Value": "1.020", "Volatility": "2,352", "Value_at_risk": "5,471"}
Как мне использовать pyspark ?
Я также пытался увидеть решение этой ссылки: Объединение двух потоков искр по ключу , но оно не работает. Спасибо