Фрагменты кода:
Файл 1: master.py
# Spark Imports
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
#Import self defined function
from helper import enrichment
def ingestion(sc,ssc):
# Work with stream
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "streaming-consumer", {topic: 1})
# Call function defined in helper.py
enriched_data = kafkaStream_json.map(lambda single_log:enrichment(single_log,client_id,machine_id))
if __name__ == "__main__":
# Name of Spark App
conf = SparkConf().setAppName("Test")
# Spark and Spark streaming configuration
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
ingestion(sc,ssc)
# Start the stream and keep it running unless terminated
ssc.start()
ssc.awaitTermination()
Файл2: helper.py
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
def enrichment():
test_df = pd.DataFrame(some operations...)
spark_df = sqlContext.createDataFrame(test_df)
...
Возникли проблемы:
Потоковая часть этого работает нормально, однако, когда я вызываю функцию enrichment
, это следующие проблемы, с которыми я сталкиваюсь в зависимости от использования:
Case1: Когда приведенный выше пример запускается, он говорит:
spark_df = sqlContext.createDataFrame(test_df)
NameError: global name 'sqlContext' is not defined
Случай 2: Когда я передаю искровой контекст в качестве аргумента, это сообщение появляется:
"Исключение: похоже, вы пытаетесь сослаться
SparkContext из широковещательной переменной, действия или трансформации.
SparkContext может использоваться только в драйвере, а не в коде, который он запускает
на рабочих. Для получения дополнительной информации см. SPARK-5063. "
Это самое близкое решение, которое я нашел:
ОШИБКА: SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063
Однако, похоже, это не решило мою проблему. Любые выводы будут оценены.
Мне нужно иметь эти два отдельных файла, Inline не будет работать. Код запускается с использованием:
sudo $SPARK_HOME/spark-submit --master local[2] /home/user/master.py