Spark: вернуть пустой столбец, если столбец не существует в кадре данных - PullRequest
0 голосов
/ 04 октября 2018

Как показано в приведенном ниже коде, я считываю файл JSON в фрейм данных и затем выбираю некоторые поля из этого фрейма в другой.

df_record = spark.read.json("path/to/file.JSON",multiLine=True)

df_basicInfo = df_record.select(col("key1").alias("ID"), \
                                col("key2").alias("Status"), \
                                col("key3.ResponseType").alias("ResponseType"), \
                                col("key3.someIndicator").alias("SomeIndicator") \
                                )

Проблема заключается в том, что иногда файл JSONнет некоторых ключей, которые я пытаюсь получить - например, ResponseType.Таким образом, в конечном итоге выдается сообщение об ошибке:

org.apache.spark.sql.AnalysisException: No such struct field ResponseType

Как обойти эту проблему, не применяя схему во время чтения?можно ли заставить его возвращать NULL в этом столбце, когда он недоступен?

как определить, есть ли в кадре данных искры столбец В нем упоминается, как определить, является ли столбецдоступны в кадре данных.Этот вопрос, однако, о том, как использовать эту функцию.

Ответы [ 3 ]

0 голосов
/ 28 июня 2019

В Spark отсутствует простая функция: struct_has(STRUCT, PATH) или struct_get(STRUCT, PATH, DEFAULT), где PATH используется точечная нотация.

Поэтому я написал очень простой UDF:

С https://gist.github.com/ebuildy/3c9b2663d47f7b65fbc12cfb469ae19c:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("struct_def", (root:GenericRowWithSchema, path: String, defaultValue: String) => {

    var fields = path.split("\\.")
    var buffer:Row = root
    val lastItem = fields.last

    fields = fields.dropRight(1)

    fields.foreach( (field:String) => {
        if (buffer != null) {
            if (buffer.schema.fieldNames.contains(field)) {
                buffer = buffer.getStruct(buffer.fieldIndex(field))
            } else {
                buffer = null
            }
        }
    })

    if (buffer == null) {
        defaultValue
    } else {
        buffer.getString(buffer.fieldIndex(lastItem))
    }
})

Это позволит вам сделать запрос следующим образом:

SELECT struct_get(MY_COL, "foo.bar", "no") FROM DATA
0 голосов
/ 30 июля 2019

У меня была та же проблема, я использовал тот же подход, что и Томас.Мой пользовательский код функции:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.Row

spark.udf.register("tryGet", (root:GenericRowWithSchema, fieldName: String) => {
    var buffer:Row = root

    if (buffer != null) {
      if (buffer.schema.fieldNames.contains(fieldName)) {
         buffer.getString(buffer.fieldIndex(fieldName))
      } else {
        null
      }
    }
    else {
      null
    }
})

, а затем мой запрос:

%sql

SELECT
  Id,
  Created,
  Payload.Type,
  tryGet(Payload, "Error") as Error,
FROM dataWithJson
WHERE Payload.Type = 'Action'
0 голосов
/ 04 октября 2018

Используя функцию has_column, определите здесь с помощью zero323 и общие рекомендации по добавлению пустых столбцов либо

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *

if has_column(df_record, "key3.ResponseType"):
    df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
    # Adjust types according to your needs
    df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string")) 

и повторите длякаждый столбец, который вам нужен, или

df_record.withColumn(
   "ResponseType", 
   when(
       lit(has_column(df_record, "key3.ResponseType")),
       col("key3.ResponseType")
   ).otherwise(lit(None).cast("string"))

Настройте типы в соответствии с вашими требованиями и повторите процедуру для остальных столбцов.

В качестве альтернативы определите схему, охватывающую все требуемые типы:

schema = StructType([
    StructField("key1", StringType()),
    StructField("key2", StringType()),
    StructField("key2", StructType([
        StructField("ResponseType", StringType()),
        StructField("someIndicator", StringType()),
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

(еще раз настройте типы) и используйте свой текущий код.

...