Я пытаюсь вычислить корреляцию Пирсона между двумя DStreams, используя скользящее окно в Pyspark.Но я продолжаю получать следующую ошибку:
Traceback (most recent call last):
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py", line 63, in <module>
result = Statistics.corr(windowedds1,windowedds2, method="pearson")
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py", line 157, in corr
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 122, in callJavaFunc
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 87, in _py2java
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps
File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", line 315, in __getnewargs__
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Ошибка исходит из этой строки:
result = Statistics.corr(windowedds1,windowedds2, method="pearson")
Сначала я прочиталстроки из 2 текстовых файлов и загрузите их в две темы Kafka, а затем примените оконную операцию к каждому DStream и вычислите корреляцию Пирсона между ними.
Вот мой код:
from __future__ import print_function
from future.builtins import *
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import time
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy import read
if __name__ == "__main__":
print("hello spark")
sc = SparkContext("local[2]", appName="CrossCorrelation")
ssc = StreamingContext(sc, 5)
broker, topic1, topic2 = sys.argv[1:]
# Connect to Kafka
kvs1 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic1:1})
kvs2 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic2:1})
lines1 = kvs1.map(lambda x1: x1[1])
ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda strelem1: float(strelem1))
lines2 = kvs2.map(lambda x2: x2[1])
ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda strelem2: float(strelem2))
#Windowing
windowedds1= ds1.window(10,5)
windowedds2= ds2.window(10,5)
#Correlation
result = Statistics.corr(windowedds1,windowedds2, method="pearson")
if result > 0.7:
print("ds1 and ds2 are correlated!!!")
ssc.start()
ssc.awaitTermination()
кто-нибудь знает, что я делаю не так?
Спасибо.