Как проверить, что файл JSON поврежден, например отсутствует {,}, запятая или неправильный тип данных. Я пытаюсь достичь с помощью аккумулятора, потому что процесс работает на нескольких исполнителей.
spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()
class StringAccumulatorParam(AccumulatorParam):
def zero(self, v):
return []
def addInPlace(self, variable, value):
variable.append(value)
return variable
errorCount = ss.sparkContext.accumulator(0)
errorValues = ss.sparkContext.accumulator("", StringAccumulatorParam())
newSchema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
StructField("status", BooleanType(), True)])
errorDF = ss.read.json("/Users/test.jsonl")
errorDF2 = ss.createDataFrame(errorDF, newSchema).cache()
def checkErrorCount(row):
global errorCount
errorDF2["id"] = row. newSchema["id"]
errorCount.add(1)
errorValues.add(errorDF2["id"])
errorDF.foreach(lambda x: checkErrorCount(x))
print("{} rows had questionable values.".format(errorCount.value))
ss.stop()
Вот поврежденный файл JSON -
{"name":"Standards1","id":90,"status":true}
{"name":"Standards2","id":91
{"name":"Standards3","id":92,"status":true}
{"name":781,"id":93,"status":true}