Передать контекст зажигания как параметр между файлами в PySpark - PullRequest
0 голосов
/ 05 сентября 2018

Фрагменты кода:

Файл 1: master.py

# Spark Imports
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext

#Import self defined function
from helper import enrichment


def ingestion(sc,ssc):
    # Work with stream
    kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "streaming-consumer", {topic: 1})
    # Call function defined in helper.py
    enriched_data = kafkaStream_json.map(lambda single_log:enrichment(single_log,client_id,machine_id))

if __name__ == "__main__":
    # Name of Spark App
    conf = SparkConf().setAppName("Test")

    # Spark and Spark streaming configuration
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 1)
    ingestion(sc,ssc)
    # Start the stream and keep it running unless terminated
    ssc.start()
    ssc.awaitTermination()

Файл2: helper.py

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
def enrichment():
    test_df = pd.DataFrame(some operations...)
    spark_df = sqlContext.createDataFrame(test_df)
    ...

Возникли проблемы:

Потоковая часть этого работает нормально, однако, когда я вызываю функцию enrichment, это следующие проблемы, с которыми я сталкиваюсь в зависимости от использования:

Case1: Когда приведенный выше пример запускается, он говорит:

spark_df = sqlContext.createDataFrame(test_df)
NameError: global name 'sqlContext' is not defined

Случай 2: Когда я передаю искровой контекст в качестве аргумента, это сообщение появляется:

"Исключение: похоже, вы пытаетесь сослаться SparkContext из широковещательной переменной, действия или трансформации. SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063. "

Это самое близкое решение, которое я нашел: ОШИБКА: SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063

Однако, похоже, это не решило мою проблему. Любые выводы будут оценены.

Мне нужно иметь эти два отдельных файла, Inline не будет работать. Код запускается с использованием:

sudo $SPARK_HOME/spark-submit --master local[2] /home/user/master.py

1 Ответ

0 голосов
/ 05 сентября 2018

Я думаю, что вы должны использовать SparkSession.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()

Вы можете передать искру в качестве аргумента функции обогащения:

def enrichment(spark):
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...

или:

def enrichment():
    spark = SparkSession.builder.getOrCreate()
    test_df = pd.DataFrame(some operations...)
    spark_df = spark.createDataFrame(test_df)
    ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...