PySpark: невозможно записать структуры (DF -> Parquet) - PullRequest
2 голосов
/ 21 апреля 2020

У меня есть конвейер предварительной обработки данных, где я очищаю данные от десятков тысяч твитов. Я хочу сохранить свой фрейм данных поэтапно, чтобы я мог загрузить эти «точки сохранения» из более поздних стадий моего конвейера. Я читал, что сохранение фрейма данных в формате паркета является наиболее «эффективным» методом записи, поскольку он быстрый, масштабируемый и т. Д. c. и это идеально для меня, так как я пытаюсь учитывать масштабируемость для этого проекта.

Однако я столкнулся с проблемой, когда не могу сохранить поля, содержащие структуры, в файл. Я получаю JSON ошибку json.decoder.JSONDecodeError: Expecting ',' delimiter: ... при попытке вывести мой фрейм данных (подробнее см. Ниже).

Мой фрейм данных находится в следующем формате:

+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|                id| timestamp|          tweet_text|      tweet_hashtags|tweet_media|          tweet_urls|               topic|          categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|     Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...|         []|[[http:url...       |guatemalaEarthqua...|[CallToAction-Don...|  Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266381928989589505|1536251780|Strong earthquake...|                  []|         []|[[http:url...       |guatemalaEarthqua...|[Report-EmergingT...|  Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...|                  []|         []|                  []|guatemalaEarthqua...|[Report-EmergingT...|  Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows

со следующей схемой для ясности:

root
 |-- id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- tweet_hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = false)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_media: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- type: string (nullable = false)
 |    |    |-- url: string (nullable = true)
 |    |    |-- media_url: string (nullable = true)
 |    |    |-- media_https: string (nullable = true)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- tweet_urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: string (nullable = false)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- indices: array (nullable = false)
 |    |    |    |-- element: integer (containsNull = true)
 |-- topic: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- priority: string (nullable = true)

Я пытаюсь сохранить этот кадр данных в формате паркета со следующей строкой:

df.write.mode('overwrite').save(
    path=f'{DATA_DIR}/interim/feature_select.parquet',
    format='parquet')

, а также с df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite').

Однако при попытке сохранить эти файлы мне выдается ошибка json.decoder.JSONDecodeError: Expecting ',' delimiter: ...:

  File "features.py", line 207, in <lambda>
    entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
  File "features.py", line 194, in _convert_str_to_arr
    arr = [json.loads(x) for x in arr]
  File "features.py", line 194, in <listcomp>
    arr = [json.loads(x) for x in arr]
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)

Строка, указанная в коде ошибки, также относится к более раннему преобразованию UDF, которое я сделал для ряда столбцы (столбцы tweet_*). Это прекрасно работает, когда я удаляю писателя.

Я не смог найти много о том, как определить разделитель для паркетных файлов, это возможно? Или мне придется сериализовать какие-либо данные, которые содержат запятую? Или мне даже придется взять структуры Spark, которые я проанализировал и изменил, и преобразовать их обратно в JSON, чтобы сохранить файл?

1 Ответ

0 голосов
/ 27 апреля 2020

Эта ошибка на самом деле не имеет отношения к паркету вообще. Преобразования в информационном кадре не применяются до тех пор, пока не будет выполнено действие (в этом случае сохранение в паркет). Таким образом, ошибка не будет возникать до этой точки.

Из этой ошибки мы видим, что действительной проблемой является строка:

arr = [json.loads(x) for x in arr]

, которая возникает внутри преобразования UDF.

A json.decoder.JSONDecodeError ошибка возникает, когда есть проблема с JSON. Две распространенные проблемы: он недействителен JSON или существует проблема с кавычками, см. здесь . Итак,

  1. Убедитесь, что столбцы содержат действительные JSON.
  2. Попробуйте заменить \" на \\", это можно сделать как x.replace("\\", r"\\").
...