Это не странно. Поскольку вы не предоставили схему, Spark должен определить ее на основе данных. Если RDD
является входом, он будет вызывать SparkSession._createFromRDD
, а затем SparkSession._inferSchema
, который, если отсутствует samplingRatio
, оценивает до 100 строка : * +1011 *
first = rdd.first()
if not first:
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated. "
"Use pyspark.sql.Row instead")
if samplingRatio is None:
schema = _infer_schema(first, names=names)
if _has_nulltype(schema):
for row in rdd.take(100)[1:]:
schema = _merge_type(schema, _infer_schema(row, names=names))
if not _has_nulltype(schema):
break
else:
raise ValueError("Some of types cannot be determined by the "
"first 100 rows, please try again with sampling")
Теперь остается только загадка, если она не оценивает ровно одну запись. Ведь в вашем случае first
не является пустым и не содержит None
.
Это потому, что first
реализован через take
и не гарантирует, что будет оценено точное количество предметов. Если первый раздел не даст нужного количества элементов, он будет многократно увеличивать количество сканируемых разделов. Пожалуйста, проверьте реализацию для деталей.
Если вы хотите избежать этого, вы должны использовать createDataFrame
и предоставить схему в виде строки DDL:
spark.createDataFrame(a.map(f), "val: integer")
или эквивалент StructType
.
Подобного поведения вы не найдете в аналоге Scala, поскольку он не использует логический вывод схемы в toDF
. Он либо извлекает соответствующую схему из Encoder
(которая выбирается с использованием отражения Scala), либо вообще не разрешает преобразование. Наиболее близким подобным поведением является вывод на входном источнике, таком как CSV или JSON :
spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))