Почему PySpark выдает ошибку JSONDecodeError при попытке загрузить строку из Kafka topi c в словарь? - PullRequest
0 голосов
/ 30 мая 2020

Я использую Spark Structured Streaming для чтения из Kafka topi c (записи в формате JSON) с намерением выполнить некоторые преобразования, как только я получу данные в Spark. Мой первый шаг - прочитать записи из topi c в виде строк, а затем преобразовать их в словарь (через библиотеку python json) для выполнения преобразований (я не хочу использовать возможность to_json в Spark потому что формат записей в моем Kafka topi c может измениться, и я не хочу использовать Spark Streaming , который не требует схемы, потому что нет встроенных функций для обратной записи в темы). Я получил JSONDecodeError (ниже) при попытке преобразовать строки.

Вот компоненты, которые вызывают мою ошибку:

Я запускаю следующий код PySpark локально:

from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import pyspark
import json
from pyspark.sql.types import StringType

def str_to_json(s):
    j = json.loads(s)
    return j

if __name__ == "__main__":
    spark = SparkSession.builder \
        .master("local") \
        .appName("App Name") \
        .getOrCreate()

    strToJson = udf(str_to_json, StringType())

    spark.udf.register("strToJson", strToJson)

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "first_topic") \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    df_transformed = df.withColumn('strToJson', strToJson('value'))

    query = df_transformed \
        .writeStream \
        .format("console") \
        .outputMode("update") \
        .start()

Затем я записываю запись в свой Kafka topi c, используя kafka-console-producer также с моего локального компьютера, в следующем формате:

'{"name":"bob"}'

Следующая ошибка затем выводится на консоль, когда Spark пытается использовать UDF для преобразования значения записи (из Kafka Topi c) в объект JSON / словаря в задании PySpark:

raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Я добавил пару отладочных операторов печати в UDF, чтобы увидеть, какое значение передается:

>>> type(s)
<class 'str'>

>>> print(s)
'{"name": "bob"}'

1 Ответ

1 голос
/ 30 мая 2020

Можете ли вы попробовать приведенный ниже код, чтобы создать сообщение json для производителя консоли kafka?

сохранить записи json в файле, как показано ниже

[nsr@inlthome]$ cat > samplerecords.json
{"name": "bob"}
{"name": "john"}

и запустить производитель консоли

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first_topic < samplerecords.json  

Я пробовал запустить программу ниже: enter image description here

...