Ошибка при передаче кадра данных в UDF в структурированной потоковой передаче - PullRequest
0 голосов
/ 13 июля 2020

Я читаю события из Kafka в структурированной потоковой передаче Spark, и мне нужно обрабатывать события одно за другим и записывать в redis. Я написал для этого UDF, но он дает мне ошибку контекста искры.

conf = SparkConf()\
.setAppName(spark_app_name)\
.setMaster(spark_master_url)\
.set("spark.redis.host", "redis")\
.set("spark.redis.port", "6379")\
.set("spark.redis.auth", "abc")

spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()

def func(element, event, timestamp):
    #redis i/o
    pass

schema = ArrayType(StructType(
[
    StructField("element_id", StringType()),
    StructField("event_name", StringType()),
    StructField("event_time", StringType())
]
))

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", topic) \
    .load()
    #.option("includeTimestamp", value = True)\

ds = df.selectExpr(("CAST(value AS STRING)"))\
     .withColumn("value", explode(from_json("value", schema)))

filter_func = udf(func, ArrayType(StringType()))

ds = ds.withColumn("column_name", filter_func(
    ds['value']['element_id'], 
    ds['value']['event_name'], 
    ds['value']['event_time']
))

query = ds.writeStream \
        .format("console") \
        .start()

query.awaitTermination()

Сообщение об ошибке: _pickle.PicklingError: не удалось сериализовать объект: Исключение: похоже, вы пытаетесь ссылаться на SparkContext из широковещательной переменной , действие или преобразование. SparkContext можно использовать только в драйвере, но не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 14 июля 2020

Я пытался получить доступ к контексту искры из пользовательской функции, что запрещено. В udf я пытался писать в spark-redis, используя контекст искры.

...