Есть ли способ динамически создавать информацию схемы с помощью pyspark, а не экранировать символы в выходном jsonfile? - PullRequest
0 голосов
/ 28 декабря 2018

В настоящее время pyspark форматирует logFile, затем загружает красное смещение.

Анализирует каждый элемент о logFile, выведенном в формате json, добавляет элемент и загружает его в Redshift.Тем не менее, формат некоторых элементов отличается для каждого типа.(Для того же элемента Shcema применяется заранее.) Экранирующие символы будут вводиться даже при выводе, как есть. There Есть ли способ динамического создания информации о схеме, и в выходном файле jsonfile нет escape-символа?

-Окружающая среда -

- spark 2.4.0
- python version 2.7.15

- DataFrame -

>> df.printSchema()
root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

>> df.show(2,False)
+------+------------------------------------------------------------+
|Name  |d                                                           |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2]      |
+------+------------------------------------------------------------+

- Схема (для общего элемента) -

>> print(json.dumps(schema.jsonValue(), indent=2))
{
  "fields": [
    {
      "metadata": {}, 
      "type": "string", 
      "name": "Name", 
      "nullable": false
    }, 
    {
      "metadata": {}, 
      "type": {
        "keyType": "string", 
        "type": "map", 
        "valueType": "string", 
        "valueContainsNull": true
      }, 
      "name": "d", 
      "nullable": false
    }
  ], 
  "type": "struct"
}

- Код -

from pyspark.sql.types import *

rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
    ,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)

- Выходной файл json -

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}

- Выходной файл json (идеально) -

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}

Я попытался использовать schema_of_json () и from_json () из pyspark.sql.functions, но это не сработало.(schema_of_json может принимать только символьные литералы)

- Результаты испытаний -

from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"

1 Ответ

0 голосов
/ 09 августа 2019

tl; dr

Короткий ответ - нет, нет способа динамически вывести схему для каждой строки и получить столбец, в котором разные строки имеют разные схемы.

Однако есть способ вывести желаемую строку json и согласовать разные json в общую схему с богатым типом

Подробно

Если бы это было разрешено, это было бы мучительно медленно, но, что более важно, это не разрешено, потому что это нарушает реляционную модель, которая позволяет SparkSQL работать согласованно.

Кадр данных состоит из столбцов (полей), а столбец имеет толькоодин тип данных;тип данных представляет весь столбец.Это не является строго обязательным для исполнения в Pyspark, учитывая природу Python, но важно во время выполнения, чтобы этот оператор все еще применялся.

В вашем примере, если вы хотите project атрибут City с чем-токак d.Body.City, тогда это должно существовать как для Альфреда, так и для Эмбер.По крайней мере, метаданные для этого поля должны существовать, даже если нет значения.Механизм выполнения должен быстро узнать, является ли путь неверным или нет, чтобы избежать бессмысленного сканирования каждой строки.

Существует несколько способов согласования нескольких типов в одном столбце (я уверен, что я могу сделать больше)t думать о):

  1. использовать вариант / union / option type (например, объединить все общие и необычные схемы json вместе)
  2. сериализовать его в нечто, например, в строку json (это то место, где вы начинаете перед применением этой jsonschema, отлично подходит для передачи данных, не очень хорошо для анализа)
  3. превратить его в супертип, коробочный или универсальный объект (как в RDD), имеющий наименьший общий знаменательповедение / интерфейс / свойство (теряет метаданные и атрибуты подтипов)
  4. Не храните его как один тип, например, храните отдельные варианты в разных столбцах и используйте разные схемы json для каждого

Мне нравится (1) в этом случае, но (4) может быть допустимым в качестве промежуточного шага к поиску универсальной схемы.

Ваш пример "обычной" схемы json больше похож на option (3).Внутри карты, которую вы назвали «d» (я полагаю, потому что это диктат?) Информация о полях недоступна без сканирования данных.

root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Я понимаю, что это всего лишь промежуточный шаг кдобавление нового столбца, содержащего Body, но для этого необходимо перечислить все возможные ключи на этой карте в более полезную схему.

Решение

Универсальный (общий)схема не является общей картой string -> string, я думаю, что она более полезна, как показано ниже.Он близок к тому, что вы пробовали изначально, но не является динамическим и действителен для обеих строк.Обратите внимание, что nullable является значением по умолчанию True для всех атрибутов

schema_body = StructType([
    StructField("City", StringType()),
    StructField("Country", StringType()),
    StructField("Weight", IntegerType()),
    StructField("Height", IntegerType())
])

df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- Body: struct (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Weight: integer (nullable = true)
 |    |-- Height: integer (nullable = true)


df.show(2, False)

+------+---------------------------------------------------------------+---------------------+
|Name  |d                                                              |Body                 |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2)      |[null,null,80,176]   |
+------+---------------------------------------------------------------+---------------------+

Теперь вы можете легко добраться до Body.City, выбрав d.Body.City, не беспокоясь о том, какие строки имеют город.

Для вашего следующего шага вы можете вернуть его обратно в строку json

df = df.withColumn("Body", to_json("d.Body"))

Вы также можете объединить его с предыдущим шагом

df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
 |-- Name: string (nullable = false)
 |-- BodyAttributes: struct (nullable = true)
 |    |-- Body: string (nullable = true)
 |    |-- BodyType: integer (nullable = true)
 |-- Body: string (nullable = true)

df.show(2, False)

+------+---------------------------------------+--------------------------------+
|Name  |BodyAttributes                         |Body                            |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2]      |{"Weight":80,"Height":176}      |
+------+---------------------------------------+--------------------------------+

Примечаниепри преобразовании его обратно в строку json эти значения NULL исчезают.Также теперь jsonstring так легко записать в файл, как вы хотели.


Идем дальше

Если бы вы делали это как часть процесса, чтобы сделать данные доступными для анализа, составления отчетов или других целей, я бы сделал что-то вроде этого

schema = StructType([
    StructField('Name',StringType(), False),
    StructField(
        'd',
        StructType([
            StructField("Body", StringType()),
            StructField("BodyType", IntegerType())
        ])
    )
])

df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
    "Body", 
    from_json("d.Body", schema_body)
).withColumn(
    "BodyType", 
    col("d.BodyType")
).drop("d")

df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- Body: struct (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Weight: integer (nullable = true)
 |    |-- Height: integer (nullable = true)
 |-- BodyType: integer (nullable = true)


df.show(2, False)

+------+---------------------+--------+
|Name  |Body                 |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1       |
|Alfred|[null,null,80,176]   |2       |
+------+---------------------+--------+

Затем вы можете выбрать Body.City, Body.Country, Body.Weight, Body.Height`

Вы могли бы сделать еще один шаг, но это действительно зависело бы от того, сколько из этих возможных ключей тела существуети насколько оно редкое.

df = df.withColumn(
    "City", col("Body.City")
).withColumn(
    "Country", col("Body.Country")
).withColumn(
    "Weight", col("Body.Weight")
).withColumn(
    "Height", col("Body.Height")
).drop("Body")

df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- BodyType: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Height: integer (nullable = true)

df.show(2, False)

+------+--------+------+-------+------+------+
|Name  |BodyType|City  |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1       |Oregon|US     |null  |null  |
|Alfred|2       |null  |null   |80    |176   |
+------+--------+------+-------+------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...