Сохранение фрейма данных pyspark со сложной схемой в виде простого текста для тестирования - PullRequest
2 голосов
/ 14 марта 2019

Как сделать чистые тестовые данные для pyspark? Я понял кое-что, что кажется довольно хорошим, но части кажутся немного неловкими, поэтому я публикую.

Допустим, у меня есть фрейм данных df со сложной схемой и небольшим количеством строк. Я хочу проверить данные теста в моем репо. Я не хочу двоичный файл. На данный момент, я не уверен, что лучший способ продолжить, но я думаю, у меня есть файл, как

test_fn.py

и в нем есть это

schema_str='struct<eventTimestamp:timestamp,list_data:array<struct<valueA:string,valueB:string,valueC:boolean>>>' 

для получения схемы в формате txt, используя функцию df.schema.simpleString(). Затем, чтобы получить строки - я делаю

lns = [row.json_txt for row in df.select((F.to_json(F.struct('*'))).alias('json_txt')).collect()]

теперь я помещаю эти строки в мой test_fn.py файл, или я могу иметь файл .json в репо.

Теперь, чтобы запустить тест, мне нужно создать фрейм данных с правильной схемой и данными из этого текста. Кажется, единственный способ, которым spark анализирует простую строку, - это если я создаю с ней фрейм данных, то есть я не могу передать эту простую строку в функцию from_json? Так что это немного неловко, поэтому я решил опубликовать -

schema2 = spark.createDataFrame(data=[], schema=schema_str).schema
lns = # say I read the lns back from above 
df_txt = spark.createDataFrame(data=lns, schema=T.StringType())

Я вижу, что у df_txt только один столбец с именем 'value'

df_json = df_txt.select(F.from_json('value', schema=schema2).alias('xx'))
sel = ['xx.%s' % nm for nm in df_json.select('xx').schema.fields[0].dataType.fieldNames()]
df2 = df_json.select(*sel)

Теперь df2 должно быть таким же, как df1 - что, как я вижу, относится к модулю deepdiff.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...