Как сделать чистые тестовые данные для pyspark? Я понял кое-что, что кажется довольно хорошим, но части кажутся немного неловкими, поэтому я публикую.
Допустим, у меня есть фрейм данных df
со сложной схемой и небольшим количеством строк. Я хочу проверить данные теста в моем репо. Я не хочу двоичный файл. На данный момент, я не уверен, что лучший способ продолжить, но я думаю, у меня есть файл, как
test_fn.py
и в нем есть это
schema_str='struct<eventTimestamp:timestamp,list_data:array<struct<valueA:string,valueB:string,valueC:boolean>>>'
для получения схемы в формате txt, используя функцию df.schema.simpleString()
. Затем, чтобы получить строки - я делаю
lns = [row.json_txt for row in df.select((F.to_json(F.struct('*'))).alias('json_txt')).collect()]
теперь я помещаю эти строки в мой test_fn.py
файл, или я могу иметь файл .json
в репо.
Теперь, чтобы запустить тест, мне нужно создать фрейм данных с правильной схемой и данными из этого текста. Кажется, единственный способ, которым spark анализирует простую строку, - это если я создаю с ней фрейм данных, то есть я не могу передать эту простую строку в функцию from_json
? Так что это немного неловко, поэтому я решил опубликовать -
schema2 = spark.createDataFrame(data=[], schema=schema_str).schema
lns = # say I read the lns back from above
df_txt = spark.createDataFrame(data=lns, schema=T.StringType())
Я вижу, что у df_txt только один столбец с именем 'value'
df_json = df_txt.select(F.from_json('value', schema=schema2).alias('xx'))
sel = ['xx.%s' % nm for nm in df_json.select('xx').schema.fields[0].dataType.fieldNames()]
df2 = df_json.select(*sel)
Теперь df2
должно быть таким же, как df1
- что, как я вижу, относится к модулю deepdiff
.