Pyspark - проверка формата Json с помощью аккумулятора - PullRequest
0 голосов
/ 07 мая 2018

Как проверить, что файл JSON поврежден, например отсутствует {,}, запятая или неправильный тип данных. Я пытаюсь достичь с помощью аккумулятора, потому что процесс работает на нескольких исполнителей.

spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

class StringAccumulatorParam(AccumulatorParam):
  def zero(self, v):
      return []
  def addInPlace(self, variable, value):
      variable.append(value)
      return variable
errorCount = ss.sparkContext.accumulator(0)
errorValues = ss.sparkContext.accumulator("", StringAccumulatorParam())

newSchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
    StructField("status", BooleanType(), True)])

errorDF = ss.read.json("/Users/test.jsonl")
errorDF2 = ss.createDataFrame(errorDF, newSchema).cache()

def checkErrorCount(row):
   global errorCount
   errorDF2["id"] = row. newSchema["id"]
      errorCount.add(1)
      errorValues.add(errorDF2["id"])

errorDF.foreach(lambda x: checkErrorCount(x))
print("{} rows had questionable values.".format(errorCount.value))

ss.stop()

Вот поврежденный файл JSON -

{"name":"Standards1","id":90,"status":true}
{"name":"Standards2","id":91
{"name":"Standards3","id":92,"status":true}
{"name":781,"id":93,"status":true}

1 Ответ

0 голосов
/ 08 мая 2018

Я поиграл с этим и придумал следующее.

Думаю, что из 2-х решений разница в подсчетах будет быстрее, поскольку будет использоваться собственная обработка Spark JSON.

Решение UDF выполнит синтаксический анализ JSON в Python, то есть вам придется оплатить стоимость переноса каждой строки файла из Java в Python, поэтому, вероятно, будет медленнее.

import json
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, udf
from pyspark.sql.types import LongType

application_name = 'Count bad JSON lines'
spark_config = SparkConf().setAppName(application_name)
ss = SparkSession.builder.config(conf=spark_config).getOrCreate()

# Difference of counts solution
input_path = '/baddata.json'
total_lines = ss.read.text(input_path).count()
good_lines = ss.read.option('mode', 'DROPMALFORMED').json(input_path).count()
bad_lines = total_lines - good_lines
print('Found {} bad JSON lines in data'.format(bad_lines))

# Parse JSON with UDF solution
def is_bad(line):
    try:
        json.loads(line)
        return 0
    except ValueError:
        return 1

is_bad_udf = udf(is_bad, LongType())
lines = ss.read.text(input_path)
bad_sum = lines.select(sum(is_bad_udf('value'))).collect()[0][0]
print('Got {} bad lines'.format(bad_sum))

ss.stop()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...