Input
PySpark DF считывает данные из файла JSON (вывод предыдущего задания ETL) со сложной структурой данных (множество вложенных полей). Файл содержит более 100 000 записей.
Мне нужно отфильтровать записи, которые имеют непустое поле name.en.
Предварительно я вручную проверил содержимое входного файла JSON - fieldc'name.en 'пусто для 5% всех записей, поэтому я ожидаю увидеть 95 000 записей в выводе.
Задача
Когда Spark считывает данные и утверждает, что в MEMORY_ONLY
(или MEMORY_AND_DISK
) большинство записей исчезают.
Случай № 1 - с сохранением (не работает)
df = sparkSession.read.format('json').schema(REQUIRED_SCHEMA).load(JSON_FILE_PATH)
df.persist(pyspark.StorageLevel.MEMORY_ONLY)
print(df.filter('name.en IS NOT NULL').count()) #returns 115 records instead of 95,000+
объяснение для df.filter ('name.en IS NOT NULL') с persist()
:
== Physical Plan ==
Filter (isnotnull(name#2) && isnotnull(name#2.en))
+- InMemoryTableScan [_id#0, field1#1, name#2, ..., field31#31], [isnotnull(name#2), isnotnull(name#2.en)]
+- InMemoryRelation [_id#0, field1#1, name#2, ..., field31#31], true, 10000, StorageLevel(memory, 1 replicas)
+- FileScan json [_id#0,field#1,name#2,..., field31#31] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/data/myfile.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_id:struct<oid:string>,field1:string,name:struct<en:string>,field3:struct<oi...
Но когда я удаляю вызов persist (), он возвращает правильное количество записей.
- Случай № 2 - Без сохранения (работает как положено)
df = sparkSession.read.format('json').schema(REQUIRED_SCHEMA).load(JSON_FILE_PATH)
print(df.filter('name.en IS NOT NULL').count()) #returns 95,000+ records as expected
объяснение для df.filter ('name.en NOT NULL') без persist()
:
== Physical Plan ==
Project [_id#0, field1#1, name#2, ... , field31#31]
+- Filter (isnotnull(name#2) && isnotnull(name#2.en))
+- FileScan json [_id#0,field1#1,name#2,...,field31#31] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/data/myfile.json], PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<_id:struct<oid:string>,field1:string,name:struct<en:string>,field3:struct<oi...
Таким образом, единственная разница - это наличие / отсутствие persist()
вызова.
Что может вызвать такие странные результаты?