Pyspark. Похоже, вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования - PullRequest
0 голосов
/ 08 октября 2018

Я пытаюсь вычислить корреляцию Пирсона между двумя DStreams, используя скользящее окно в Pyspark.Но я продолжаю получать следующую ошибку:

Traceback (most recent call last): File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py", line 63, in <module> result = Statistics.corr(windowedds1,windowedds2, method="pearson") File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py", line 157, in corr File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 122, in callJavaFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 87, in _py2java File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", line 315, in __getnewargs__ Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Ошибка исходит из этой строки:

result = Statistics.corr(windowedds1,windowedds2, method="pearson")

Сначала я прочиталстроки из 2 текстовых файлов и загрузите их в две темы Kafka, а затем примените оконную операцию к каждому DStream и вычислите корреляцию Пирсона между ними.

Вот мой код:

from __future__ import print_function
from future.builtins import *
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import time
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy import read

if __name__ == "__main__":
    print("hello spark")

    sc = SparkContext("local[2]", appName="CrossCorrelation")
    ssc = StreamingContext(sc, 5)
    broker, topic1, topic2 = sys.argv[1:]
    # Connect to Kafka

    kvs1 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic1:1})
    kvs2 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic2:1})
    lines1 = kvs1.map(lambda x1: x1[1])
    ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda strelem1: float(strelem1))
    lines2 = kvs2.map(lambda x2: x2[1])
    ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda strelem2: float(strelem2))
    #Windowing
    windowedds1= ds1.window(10,5)
    windowedds2= ds2.window(10,5)
    #Correlation
    result = Statistics.corr(windowedds1,windowedds2, method="pearson")
    if result > 0.7:
        print("ds1 and ds2 are correlated!!!")

    ssc.start()
    ssc.awaitTermination()

кто-нибудь знает, что я делаю не так?

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...