У меня есть конвейер предварительной обработки данных, где я очищаю данные от десятков тысяч твитов. Я хочу сохранить свой фрейм данных поэтапно, чтобы я мог загрузить эти «точки сохранения» из более поздних стадий моего конвейера. Я читал, что сохранение фрейма данных в формате паркета является наиболее «эффективным» методом записи, поскольку он быстрый, масштабируемый и т. Д. c. и это идеально для меня, так как я пытаюсь учитывать масштабируемость для этого проекта.
Однако я столкнулся с проблемой, когда не могу сохранить поля, содержащие структуры, в файл. Я получаю JSON ошибку json.decoder.JSONDecodeError: Expecting ',' delimiter: ...
при попытке вывести мой фрейм данных (подробнее см. Ниже).
Мой фрейм данных находится в следующем формате:
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
| id| timestamp| tweet_text| tweet_hashtags|tweet_media| tweet_urls| topic| categories|priority|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
|266269932671606786|1536170446|Eight dead in the...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Low|
|266804609954234369|1536256997|Guys, lets help ... |[[Guatemala, [72,...| []|[[http:url... |guatemalaEarthqua...|[CallToAction-Don...| Medium|
|266250638852243457|1536169939|My heart goes out...|[[Guatemala, [31,...| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266381928989589505|1536251780|Strong earthquake...| []| []|[[http:url... |guatemalaEarthqua...|[Report-EmergingT...| Medium|
|266223346520297472|1536167235|Magnitude 7.5 Qua...| []| []| []|guatemalaEarthqua...|[Report-EmergingT...| Medium|
+------------------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------+
only showing top 5 rows
со следующей схемой для ясности:
root
|-- id: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- tweet_text: string (nullable = true)
|-- tweet_hashtags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = false)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_media: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id_str: string (nullable = true)
| | |-- type: string (nullable = false)
| | |-- url: string (nullable = true)
| | |-- media_url: string (nullable = true)
| | |-- media_https: string (nullable = true)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- tweet_urls: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- url: string (nullable = false)
| | |-- display_url: string (nullable = true)
| | |-- expanded_url: string (nullable = true)
| | |-- indices: array (nullable = false)
| | | |-- element: integer (containsNull = true)
|-- topic: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: string (containsNull = true)
|-- priority: string (nullable = true)
Я пытаюсь сохранить этот кадр данных в формате паркета со следующей строкой:
df.write.mode('overwrite').save(
path=f'{DATA_DIR}/interim/feature_select.parquet',
format='parquet')
, а также с df.write.parquet(f'{DATA_DIR}/interim/feature_select.parquet', mode='overwrite')
.
Однако при попытке сохранить эти файлы мне выдается ошибка json.decoder.JSONDecodeError: Expecting ',' delimiter: ...
:
File "features.py", line 207, in <lambda>
entities_udf = F.udf(lambda s: _convert_str_to_arr(s), v)
File "features.py", line 194, in _convert_str_to_arr
arr = [json.loads(x) for x in arr]
File "features.py", line 194, in <listcomp>
arr = [json.loads(x) for x in arr]
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/media/ntfs/anaconda3/envs/py37/lib/python3.7/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ',' delimiter: line 1 column 93 (char 92)
Строка, указанная в коде ошибки, также относится к более раннему преобразованию UDF
, которое я сделал для ряда столбцы (столбцы tweet_*
). Это прекрасно работает, когда я удаляю писателя.
Я не смог найти много о том, как определить разделитель для паркетных файлов, это возможно? Или мне придется сериализовать какие-либо данные, которые содержат запятую? Или мне даже придется взять структуры Spark, которые я проанализировал и изменил, и преобразовать их обратно в JSON, чтобы сохранить файл?